Update issue #52:
Replaced the aggregate state interface by using a single aggregate state class.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@864 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
similarity index 64%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index 169ada7..f9c3b90 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -19,28 +19,27 @@
/**
*
*/
-public interface IAggregateState extends Serializable {
+public class AggregateState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ Object state = null;
- /**
- * Return the length of the state if in the frame
- * @return
- */
- public int getLength();
-
- /**
- * Return the state as a java object
- * @return
- */
- public Object getState();
-
- /**
- * Set the state.
- * @param obj
- */
- public void setState(Object obj);
-
- /**
- * Reset the state.
- */
- public void reset();
+ public void setState(Object obj) {
+ state = null;
+ state = obj;
+ }
+
+ public void reset() {
+ state = null;
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public int getLength() {
+ return -1;
+ }
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index b73cf46..ba538f7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -274,7 +274,7 @@
.createBinaryComparator();
}
final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
- final IAggregateState[] aggregateStates = new IAggregateState[mergeFactories.length];
+ final AggregateState[] aggregateStates = new AggregateState[mergeFactories.length];
for (int i = 0; i < currentWorkingAggregators.length; i++) {
currentWorkingAggregators[i] = mergeFactories[i]
.createAggregator(ctx, recordDescriptors[0],
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index bdcc2e9..a61fd16 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -66,7 +66,7 @@
private final FrameTupleAppender appender;
private final List<ByteBuffer> buffers;
private final Link[] table;
- private IAggregateState[][] aggregateStates;
+ private AggregateState[][] aggregateStates;
private int accumulatorSize;
private int lastBIndex;
@@ -93,7 +93,7 @@
buffers = new ArrayList<ByteBuffer>();
table = new Link[tableSize];
- this.aggregateStates = new IAggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
+ this.aggregateStates = new AggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
accumulatorSize = 0;
this.fields = fields;
@@ -176,7 +176,7 @@
int sbIndex = lastBIndex;
int stIndex = appender.getTupleCount() - 1;
for (int i = 0; i < aggregators.length; i++) {
- IAggregateState aggState = aggregators[i].createState();
+ AggregateState aggState = aggregators[i].createState();
aggregators[i].init(accessor, tIndex, null, aggState);
if (saIndex >= aggregateStates[i].length) {
aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index 332e63e..b3c36e6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -135,7 +135,7 @@
: firstKeyNormalizerFactory.createNormalizedKeyComputer();
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
- final IAggregateState[] aggregateStates = new IAggregateState[aggregatorFactories.length];
+ final AggregateState[] aggregateStates = new AggregateState[aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
inRecordDescriptor, outRecordDescriptor);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 06d5f68..9bceea3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -29,7 +29,7 @@
*
* @return
*/
- public IAggregateState createState();
+ public AggregateState createState();
/**
* Initialize the state based on the input tuple.
@@ -44,7 +44,7 @@
* @throws HyracksDataException
*/
public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, IAggregateState state)
+ DataOutput fieldOutput, AggregateState state)
throws HyracksDataException;
/**
@@ -55,7 +55,7 @@
*
* @param state
*/
- public void reset(IAggregateState state);
+ public void reset(AggregateState state);
/**
* Aggregate the value. Aggregate state should be updated correspondingly.
@@ -72,7 +72,7 @@
* @throws HyracksDataException
*/
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException;
/**
@@ -88,7 +88,7 @@
* @throws HyracksDataException
*/
public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException;
+ int offset, AggregateState state) throws HyracksDataException;
/**
* Output the final aggregation result.
@@ -103,7 +103,7 @@
* @throws HyracksDataException
*/
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException;
+ int offset, AggregateState state) throws HyracksDataException;
public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
index 704fe4e..401fc51 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
@@ -51,13 +51,13 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(IAggregateState state) {
+ public void reset(AggregateState state) {
state.reset();
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException {
+ int offset, AggregateState state) throws HyracksDataException {
int sum, count;
if (data != null) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -78,7 +78,7 @@
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException {
+ int offset, AggregateState state) throws HyracksDataException {
int sum, count;
if (data != null) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -98,7 +98,7 @@
@Override
public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, IAggregateState state)
+ DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int count = 0;
@@ -123,34 +123,8 @@
}
@Override
- public IAggregateState createState() {
- return new IAggregateState() {
-
- private static final long serialVersionUID = 1L;
-
- Object state = null;
-
- @Override
- public void setState(Object obj) {
- state = null;
- state = obj;
- }
-
- @Override
- public void reset() {
- state = null;
- }
-
- @Override
- public Object getState() {
- return state;
- }
-
- @Override
- public int getLength() {
- return 8;
- }
- };
+ public AggregateState createState() {
+ return new AggregateState();
}
@Override
@@ -161,7 +135,7 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum = 0, count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
index 6760544..df71f47 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
@@ -57,13 +57,13 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(IAggregateState state) {
+ public void reset(AggregateState state) {
state.reset();
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException {
+ int offset, AggregateState state) throws HyracksDataException {
int sum, count;
if (data != null) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -84,7 +84,7 @@
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state) throws HyracksDataException {
+ int offset, AggregateState state) throws HyracksDataException {
int sum, count;
if (data != null) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
@@ -104,7 +104,7 @@
@Override
public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, IAggregateState state)
+ DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int count = 0;
@@ -132,34 +132,8 @@
}
@Override
- public IAggregateState createState() {
- return new IAggregateState() {
-
- private static final long serialVersionUID = 1L;
-
- Object state = null;
-
- @Override
- public void setState(Object obj) {
- state = null;
- state = obj;
- }
-
- @Override
- public void reset() {
- state = null;
- }
-
- @Override
- public Object getState() {
- return state;
- }
-
- @Override
- public int getLength() {
- return 8;
- }
- };
+ public AggregateState createState() {
+ return new AggregateState();
}
@Override
@@ -170,7 +144,7 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum = 0, count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
index 3124ae6..d3b52f3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
@@ -58,13 +58,13 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(IAggregateState state) {
+ public void reset(AggregateState state) {
state.reset();
}
@Override
public void outputPartialResult(DataOutput fieldOutput,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum;
if (data != null) {
@@ -82,7 +82,7 @@
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state)
+ int offset, AggregateState state)
throws HyracksDataException {
int sum;
if (data != null) {
@@ -100,7 +100,7 @@
@Override
public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, IAggregateState state)
+ DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -122,31 +122,8 @@
}
@Override
- public IAggregateState createState() {
- return new IAggregateState() {
-
- private static final long serialVersionUID = 1L;
-
- Integer sum = null;
-
- public int getLength() {
- return 4;
- }
-
- public Object getState() {
- return sum;
- }
-
- public void setState(Object obj) {
- sum = null;
- sum = (Integer) obj;
- }
-
- public void reset() {
- sum = null;
- }
-
- };
+ public AggregateState createState() {
+ return new AggregateState();
}
@Override
@@ -156,7 +133,7 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
index 5f8a7c1..f53108a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
@@ -63,13 +63,13 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(IAggregateState state) {
+ public void reset(AggregateState state) {
state.reset();
}
@Override
public void outputPartialResult(DataOutput fieldOutput,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
if (data != null) {
@@ -87,7 +87,7 @@
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, IAggregateState state)
+ int offset, AggregateState state)
throws HyracksDataException {
try {
if (data != null) {
@@ -105,7 +105,7 @@
@Override
public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, IAggregateState state)
+ DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
@@ -146,34 +146,8 @@
}
@Override
- public IAggregateState createState() {
- return new IAggregateState() {
-
- private static final long serialVersionUID = 1L;
-
- Object state = null;
-
- @Override
- public void setState(Object obj) {
- state = null;
- state = obj;
- }
-
- @Override
- public void reset() {
- state = null;
- }
-
- @Override
- public Object getState() {
- return state;
- }
-
- @Override
- public int getLength() {
- return -1;
- }
- };
+ public AggregateState createState() {
+ return new AggregateState();
}
@Override
@@ -184,7 +158,7 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, IAggregateState state)
+ byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);