Update issue #52:

Replaced the aggregate state interface by using a single aggregate state class.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@864 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
similarity index 64%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index 169ada7..f9c3b90 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -19,28 +19,27 @@
 /**
  *
  */
-public interface IAggregateState extends Serializable {
+public class AggregateState implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    Object state = null;
 
-	/**
-	 * Return the length of the state if in the frame
-	 * @return
-	 */
-	public int getLength();
-	
-	/**
-	 * Return the state as a java object
-	 * @return
-	 */
-	public Object getState();
-	
-	/**
-	 * Set the state.
-	 * @param obj
-	 */
-	public void setState(Object obj);
-	
-	/**
-	 * Reset the state. 
-	 */
-	public void reset();
+    public void setState(Object obj) {
+        state = null;
+        state = obj;
+    }
+
+    public void reset() {
+        state = null;
+    }
+
+    public Object getState() {
+        return state;
+    }
+
+    public int getLength() {
+        return -1;
+    }
+    
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index b73cf46..ba538f7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -274,7 +274,7 @@
                         .createBinaryComparator();
             }
             final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
-            final IAggregateState[] aggregateStates = new IAggregateState[mergeFactories.length];
+            final AggregateState[] aggregateStates = new AggregateState[mergeFactories.length];
             for (int i = 0; i < currentWorkingAggregators.length; i++) {
                 currentWorkingAggregators[i] = mergeFactories[i]
                         .createAggregator(ctx, recordDescriptors[0],
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index bdcc2e9..a61fd16 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -66,7 +66,7 @@
 	private final FrameTupleAppender appender;
 	private final List<ByteBuffer> buffers;
 	private final Link[] table;
-	private IAggregateState[][] aggregateStates;
+	private AggregateState[][] aggregateStates;
 	private int accumulatorSize;
 
 	private int lastBIndex;
@@ -93,7 +93,7 @@
 		buffers = new ArrayList<ByteBuffer>();
 		table = new Link[tableSize];
 		
-		this.aggregateStates = new IAggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
+		this.aggregateStates = new AggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
 		accumulatorSize = 0;
 		
 		this.fields = fields;
@@ -176,7 +176,7 @@
 			int sbIndex = lastBIndex;
 			int stIndex = appender.getTupleCount() - 1;
 			for (int i = 0; i < aggregators.length; i++) {
-				IAggregateState aggState = aggregators[i].createState();
+				AggregateState aggState = aggregators[i].createState();
 				aggregators[i].init(accessor, tIndex, null, aggState);
 				if (saIndex >= aggregateStates[i].length) {
 					aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index 332e63e..b3c36e6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -135,7 +135,7 @@
                 : firstKeyNormalizerFactory.createNormalizedKeyComputer();
 
         final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
-        final IAggregateState[] aggregateStates = new IAggregateState[aggregatorFactories.length];
+        final AggregateState[] aggregateStates = new AggregateState[aggregatorFactories.length];
         for (int i = 0; i < aggregators.length; i++) {
             aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
                     inRecordDescriptor, outRecordDescriptor);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 06d5f68..9bceea3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -29,7 +29,7 @@
      * 
      * @return
      */
-    public IAggregateState createState();
+    public AggregateState createState();
 
     /**
      * Initialize the state based on the input tuple.
@@ -44,7 +44,7 @@
      * @throws HyracksDataException
      */
     public void init(IFrameTupleAccessor accessor, int tIndex,
-            DataOutput fieldOutput, IAggregateState state)
+            DataOutput fieldOutput, AggregateState state)
             throws HyracksDataException;
 
     /**
@@ -55,7 +55,7 @@
      * 
      * @param state
      */
-    public void reset(IAggregateState state);
+    public void reset(AggregateState state);
 
     /**
      * Aggregate the value. Aggregate state should be updated correspondingly.
@@ -72,7 +72,7 @@
      * @throws HyracksDataException
      */
     public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-            byte[] data, int offset, IAggregateState state)
+            byte[] data, int offset, AggregateState state)
             throws HyracksDataException;
 
     /**
@@ -88,7 +88,7 @@
      * @throws HyracksDataException
      */
     public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-            int offset, IAggregateState state) throws HyracksDataException;
+            int offset, AggregateState state) throws HyracksDataException;
 
     /**
      * Output the final aggregation result.
@@ -103,7 +103,7 @@
      * @throws HyracksDataException
      */
     public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-            int offset, IAggregateState state) throws HyracksDataException;
+            int offset, AggregateState state) throws HyracksDataException;
 
     public void close();
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
index 704fe4e..401fc51 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
@@ -51,13 +51,13 @@
         return new IFieldAggregateDescriptor() {
             
             @Override
-            public void reset(IAggregateState state) {
+            public void reset(AggregateState state) {
                 state.reset();
             }
             
             @Override
             public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state) throws HyracksDataException {
+                    int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
                 if (data != null) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -78,7 +78,7 @@
             
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state) throws HyracksDataException {
+                    int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
                 if (data != null) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -98,7 +98,7 @@
             
             @Override
             public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, IAggregateState state)
+                    DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int count = 0;
@@ -123,34 +123,8 @@
             }
             
             @Override
-            public IAggregateState createState() {
-                return new IAggregateState() {
-                    
-                    private static final long serialVersionUID = 1L;
-                    
-                    Object state = null;
-                    
-                    @Override
-                    public void setState(Object obj) {
-                        state = null;
-                        state = obj;
-                    }
-                    
-                    @Override
-                    public void reset() {
-                        state = null;
-                    }
-                    
-                    @Override
-                    public Object getState() {
-                        return state;
-                    }
-                    
-                    @Override
-                    public int getLength() {
-                        return 8;
-                    }
-                };
+            public AggregateState createState() {
+                return new AggregateState();
             }
             
             @Override
@@ -161,7 +135,7 @@
             
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0, count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
index 6760544..df71f47 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
@@ -57,13 +57,13 @@
         return new IFieldAggregateDescriptor() {
             
             @Override
-            public void reset(IAggregateState state) {
+            public void reset(AggregateState state) {
                 state.reset();
             }
             
             @Override
             public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state) throws HyracksDataException {
+                    int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
                 if (data != null) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -84,7 +84,7 @@
             
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state) throws HyracksDataException {
+                    int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
                 if (data != null) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -104,7 +104,7 @@
             
             @Override
             public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, IAggregateState state)
+                    DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int count = 0;
@@ -132,34 +132,8 @@
             }
             
             @Override
-            public IAggregateState createState() {
-                return new IAggregateState() {
-                    
-                    private static final long serialVersionUID = 1L;
-                    
-                    Object state = null;
-                    
-                    @Override
-                    public void setState(Object obj) {
-                        state = null;
-                        state = obj;
-                    }
-                    
-                    @Override
-                    public void reset() {
-                        state = null;
-                    }
-                    
-                    @Override
-                    public Object getState() {
-                        return state;
-                    }
-                    
-                    @Override
-                    public int getLength() {
-                        return 8;
-                    }
-                };
+            public AggregateState createState() {
+                return new AggregateState();
             }
             
             @Override
@@ -170,7 +144,7 @@
             
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0, count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
index 3124ae6..d3b52f3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
@@ -58,13 +58,13 @@
         return new IFieldAggregateDescriptor() {
 
             @Override
-            public void reset(IAggregateState state) {
+            public void reset(AggregateState state) {
                 state.reset();
             }
 
             @Override
             public void outputPartialResult(DataOutput fieldOutput,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
                 if (data != null) {
@@ -82,7 +82,7 @@
 
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state)
+                    int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
                 if (data != null) {
@@ -100,7 +100,7 @@
 
             @Override
             public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, IAggregateState state)
+                    DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -122,31 +122,8 @@
             }
 
             @Override
-            public IAggregateState createState() {
-                return new IAggregateState() {
-
-                    private static final long serialVersionUID = 1L;
-
-                    Integer sum = null;
-
-                    public int getLength() {
-                        return 4;
-                    }
-
-                    public Object getState() {
-                        return sum;
-                    }
-
-                    public void setState(Object obj) {
-                        sum = null;
-                        sum = (Integer) obj;
-                    }
-
-                    public void reset() {
-                        sum = null;
-                    }
-
-                };
+            public AggregateState createState() {
+                return new AggregateState();
             }
 
             @Override
@@ -156,7 +133,7 @@
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
index 5f8a7c1..f53108a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
@@ -63,13 +63,13 @@
         return new IFieldAggregateDescriptor() {
 
             @Override
-            public void reset(IAggregateState state) {
+            public void reset(AggregateState state) {
                 state.reset();
             }
 
             @Override
             public void outputPartialResult(DataOutput fieldOutput,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
                     if (data != null) {
@@ -87,7 +87,7 @@
 
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, IAggregateState state)
+                    int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
                     if (data != null) {
@@ -105,7 +105,7 @@
 
             @Override
             public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, IAggregateState state)
+                    DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
@@ -146,34 +146,8 @@
             }
 
             @Override
-            public IAggregateState createState() {
-                return new IAggregateState() {
-
-                    private static final long serialVersionUID = 1L;
-
-                    Object state = null;
-
-                    @Override
-                    public void setState(Object obj) {
-                        state = null;
-                        state = obj;
-                    }
-
-                    @Override
-                    public void reset() {
-                        state = null;
-                    }
-
-                    @Override
-                    public Object getState() {
-                        return state;
-                    }
-
-                    @Override
-                    public int getLength() {
-                        return -1;
-                    }
-                };
+            public AggregateState createState() {
+                return new AggregateState();
             }
 
             @Override
@@ -184,7 +158,7 @@
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, IAggregateState state)
+                    byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);