Removed the initFromPartial() method from the new aggregator interface; added AvgFieldMergeAggregatorFactory for the merge phase of AVG().

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@967 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 7bef7a9..971af1a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -455,10 +455,10 @@
                                  * Initialize the first output record Reset the
                                  * tuple builder
                                  */
-                                if (!aggregator.initFromPartial(outFrameAppender, fta,
+                                if (!aggregator.init(outFrameAppender, fta,
                                         tupleIndex, aggregateState)) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!aggregator.initFromPartial(outFrameAppender, fta,
+                                    if (!aggregator.init(outFrameAppender, fta,
                                             tupleIndex, aggregateState)) {
                                         throw new HyracksDataException(
                                                 "Failed to append an aggregation result to the output frame.");
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 4167de1..f3cf2e7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -52,22 +52,6 @@
     public boolean init(FrameTupleAppender appender,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
-    
-    /**
-     * Initialize the state based on the partial results.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param fieldOutput
-     *            The data output for the frame containing the state. This may
-     *            be null, if the state is maintained as a java object
-     * @param state
-     *            The state to be initialized.
-     * @throws HyracksDataException
-     */
-    public boolean initFromPartial(FrameTupleAppender appender,
-            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index c42d29a..3cf3d34 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -46,25 +46,6 @@
     public void init(IFrameTupleAccessor accessor, int tIndex,
             DataOutput fieldOutput, AggregateState state)
             throws HyracksDataException;
-    
-    /**
-     * Initialize the state by loading the partial results. This is specified
-     * since for some aggregations (like avg), the partial results and final 
-     * results are different, and different initialization methods should be 
-     * used.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param fieldOutput
-     *            The data output for the frame containing the state. This may
-     *            be null, if the state is maintained as a java object
-     * @param state
-     *            The state to be initialized.
-     * @throws HyracksDataException
-     */
-    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
-            DataOutput fieldOutput, AggregateState state)
-            throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
similarity index 83%
copy from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
copy to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index d2d1b03..32a7f49 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -31,7 +31,7 @@
 /**
  *
  */
-public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
+public class AvgFieldGroupAggregatorFactory implements IFieldAggregateDescriptorFactory {
     
     private static final long serialVersionUID = 1L;
     
@@ -39,7 +39,7 @@
     
     private final boolean useObjectState;
     
-    public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
+    public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState){
         this.aggField = aggField;
         this.useObjectState = useObjectState;
     }
@@ -184,35 +184,6 @@
                     }
                 };
             }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                int sum = 0;
-                int count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart + 4);
-                if (!useObjectState) {
-                    try {
-                        fieldOutput.writeInt(sum);
-                        fieldOutput.writeInt(count);
-                    } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
-                    }
-                } else {
-                    state.setState(new Integer[]{sum, count});
-                }
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
similarity index 84%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index d2d1b03..8f43dec 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -31,15 +31,16 @@
 /**
  *
  */
-public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
-    
+public class AvgFieldMergeAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
     private static final long serialVersionUID = 1L;
     
     private final int aggField;
     
     private final boolean useObjectState;
     
-    public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
+    public AvgFieldMergeAggregatorFactory(int aggField, boolean useObjectState) {
         this.aggField = aggField;
         this.useObjectState = useObjectState;
     }
@@ -100,32 +101,6 @@
             }
             
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                int sum = 0;
-                int count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += 1;
-                if (!useObjectState) {
-                    try {
-                        fieldOutput.writeInt(sum);
-                        fieldOutput.writeInt(count);
-                    } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
-                    }
-                } else {
-                    state.setState(new Integer[]{sum, count});
-                }
-            }
-            
-            @Override
             public void close() {
                 // TODO Auto-generated method stub
                 
@@ -186,7 +161,7 @@
             }
 
             @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
+            public void init(IFrameTupleAccessor accessor,
                     int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
@@ -217,3 +192,4 @@
     }
 
 }
+
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 657af3b..430284a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -153,13 +153,6 @@
                     state.setState(count);
                 }
             }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                init(accessor, tIndex, fieldOutput, state);
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 6045687..9c5062f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -181,13 +181,6 @@
                     state.setState(sum);
                 }
             }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                init(accessor, tIndex, fieldOutput, state);
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 3970e57..9152093 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -231,13 +231,6 @@
                     }
                 };
             }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                init(accessor, tIndex, fieldOutput, state);
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 11d236c..0a002da 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -270,41 +270,6 @@
                     }
                 }
             }
-
-            @Override
-            public boolean initFromPartial(FrameTupleAppender appender,
-                    IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                if (!initPending) {
-                    stateTupleBuilder.reset();
-                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                        stateTupleBuilder.addField(accessor, tIndex,
-                        		keyFieldsInPartialResults[i]);
-                    }
-                    DataOutput dos = stateTupleBuilder.getDataOutput();
-
-                    for (int i = 0; i < aggregators.length; i++) {
-                        aggregators[i].initFromPartial(accessor, tIndex, dos,
-                                ((AggregateState[]) state.getState())[i]);
-                        if (aggregateStateFactories[i].hasBinaryState()) {
-                            stateTupleBuilder.addFieldEndOffset();
-                        }
-                    }
-                }
-                // For pre-cluster: no output state is needed
-                if(appender == null){
-                    initPending = false;
-                    return true;
-                }
-                if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
-                        stateTupleBuilder.getByteArray(), 0,
-                        stateTupleBuilder.getSize())) {
-                    initPending = true;
-                    return false;
-                }
-                initPending = false;
-                return true;
-            }
         };
     }
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 753cf0c..f171239 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -53,7 +53,8 @@
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
@@ -317,7 +318,7 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, true),
 								new CountFieldAggregatorFactory(true),
-								new AvgFieldAggregatorFactory(1, true) }),
+								new AvgFieldGroupAggregatorFactory(1, true) }),
 				outputRec, tableSize);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -370,7 +371,7 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, true),
 								new CountFieldAggregatorFactory(true),
-								new AvgFieldAggregatorFactory(1, true) }),
+								new AvgFieldGroupAggregatorFactory(1, true) }),
 				outputRec);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -427,12 +428,12 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, false),
 								new CountFieldAggregatorFactory(false),
-								new AvgFieldAggregatorFactory(1, false) }),
+								new AvgFieldGroupAggregatorFactory(1, false) }),
 				new MultiFieldsAggregatorFactory(
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, false),
 								new IntSumFieldAggregatorFactory(2, false),
-								new AvgFieldAggregatorFactory(3, false) }),
+								new AvgFieldMergeAggregatorFactory(3, false) }),
 				outputRec,
 				new HashSpillableTableFactory(
 						new FieldHashPartitionComputerFactory(
@@ -850,7 +851,7 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, true),
 								new CountFieldAggregatorFactory(true),
-								new AvgFieldAggregatorFactory(1, true) }),
+								new AvgFieldGroupAggregatorFactory(1, true) }),
 				outputRec, tableSize);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -904,7 +905,7 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, true),
 								new CountFieldAggregatorFactory(true),
-								new AvgFieldAggregatorFactory(1, true) }),
+								new AvgFieldGroupAggregatorFactory(1, true) }),
 				outputRec);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -964,12 +965,12 @@
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, false),
 								new CountFieldAggregatorFactory(false),
-								new AvgFieldAggregatorFactory(1, false) }),
+								new AvgFieldGroupAggregatorFactory(1, false) }),
 				new MultiFieldsAggregatorFactory(
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(2, false),
 								new IntSumFieldAggregatorFactory(3, false),
-								new AvgFieldAggregatorFactory(4, false) }),
+								new AvgFieldMergeAggregatorFactory(4, false) }),
 				outputRec,
 				new HashSpillableTableFactory(
 						new FieldHashPartitionComputerFactory(
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 892ae39..8db3f48 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -230,7 +230,7 @@
 
         switch (alg) {
         case 0: // new external hash graph
-            grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
+            grouper = new ExternalGroupOperatorDescriptor(
                     spec,
                     keys,
                     framesLimit,
@@ -276,7 +276,7 @@
                             IntegerBinaryHashFunctionFactory.INSTANCE }));
             spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
 
-            grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(
+            grouper = new PreclusteredGroupOperatorDescriptor(
                     spec,
                     keys,
                     new IBinaryComparatorFactory[] {