Removed the initFromPartial() method from the new aggregator interface; added AvgFieldMergeAggregatorFactory for the merge phase of AVG().
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@967 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 7bef7a9..971af1a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -455,10 +455,10 @@
* Initialize the first output record Reset the
* tuple builder
*/
- if (!aggregator.initFromPartial(outFrameAppender, fta,
+ if (!aggregator.init(outFrameAppender, fta,
tupleIndex, aggregateState)) {
flushOutFrame(writer, finalPass);
- if (!aggregator.initFromPartial(outFrameAppender, fta,
+ if (!aggregator.init(outFrameAppender, fta,
tupleIndex, aggregateState)) {
throw new HyracksDataException(
"Failed to append an aggregation result to the output frame.");
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 4167de1..f3cf2e7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -52,22 +52,6 @@
public boolean init(FrameTupleAppender appender,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
-
- /**
- * Initialize the state based on the partial results.
- *
- * @param accessor
- * @param tIndex
- * @param fieldOutput
- * The data output for the frame containing the state. This may
- * be null, if the state is maintained as a java object
- * @param state
- * The state to be initialized.
- * @throws HyracksDataException
- */
- public boolean initFromPartial(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index c42d29a..3cf3d34 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -46,25 +46,6 @@
public void init(IFrameTupleAccessor accessor, int tIndex,
DataOutput fieldOutput, AggregateState state)
throws HyracksDataException;
-
- /**
- * Initialize the state by loading the partial results. This is specified
- * since for some aggregations (like avg), the partial results and final
- * results are different, and different initialization methods should be
- * used.
- *
- * @param accessor
- * @param tIndex
- * @param fieldOutput
- * The data output for the frame containing the state. This may
- * be null, if the state is maintained as a java object
- * @param state
- * The state to be initialized.
- * @throws HyracksDataException
- */
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
similarity index 83%
copy from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
copy to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index d2d1b03..32a7f49 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -31,7 +31,7 @@
/**
*
*/
-public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
+public class AvgFieldGroupAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -39,7 +39,7 @@
private final boolean useObjectState;
- public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
+ public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState){
this.aggField = aggField;
this.useObjectState = useObjectState;
}
@@ -184,35 +184,6 @@
}
};
}
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- int sum = 0;
- int count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart + 4);
- if (!useObjectState) {
- try {
- fieldOutput.writeInt(sum);
- fieldOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- } else {
- state.setState(new Integer[]{sum, count});
- }
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
similarity index 84%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index d2d1b03..8f43dec 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -31,15 +31,16 @@
/**
*
*/
-public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
-
+public class AvgFieldMergeAggregatorFactory implements
+ IFieldAggregateDescriptorFactory {
+
private static final long serialVersionUID = 1L;
private final int aggField;
private final boolean useObjectState;
- public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
+ public AvgFieldMergeAggregatorFactory(int aggField, boolean useObjectState) {
this.aggField = aggField;
this.useObjectState = useObjectState;
}
@@ -100,32 +101,6 @@
}
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- int sum = 0;
- int count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += 1;
- if (!useObjectState) {
- try {
- fieldOutput.writeInt(sum);
- fieldOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- } else {
- state.setState(new Integer[]{sum, count});
- }
- }
-
- @Override
public void close() {
// TODO Auto-generated method stub
@@ -186,7 +161,7 @@
}
@Override
- public void initFromPartial(IFrameTupleAccessor accessor,
+ public void init(IFrameTupleAccessor accessor,
int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
@@ -217,3 +192,4 @@
}
}
+
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 657af3b..430284a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -153,13 +153,6 @@
state.setState(count);
}
}
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- init(accessor, tIndex, fieldOutput, state);
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 6045687..9c5062f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -181,13 +181,6 @@
state.setState(sum);
}
}
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- init(accessor, tIndex, fieldOutput, state);
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 3970e57..9152093 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -231,13 +231,6 @@
}
};
}
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- init(accessor, tIndex, fieldOutput, state);
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 11d236c..0a002da 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -270,41 +270,6 @@
}
}
}
-
- @Override
- public boolean initFromPartial(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- if (!initPending) {
- stateTupleBuilder.reset();
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- stateTupleBuilder.addField(accessor, tIndex,
- keyFieldsInPartialResults[i]);
- }
- DataOutput dos = stateTupleBuilder.getDataOutput();
-
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].initFromPartial(accessor, tIndex, dos,
- ((AggregateState[]) state.getState())[i]);
- if (aggregateStateFactories[i].hasBinaryState()) {
- stateTupleBuilder.addFieldEndOffset();
- }
- }
- }
- // For pre-cluster: no output state is needed
- if(appender == null){
- initPending = false;
- return true;
- }
- if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
- initPending = true;
- return false;
- }
- initPending = false;
- return true;
- }
};
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 753cf0c..f171239 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -53,7 +53,8 @@
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
@@ -317,7 +318,7 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true),
new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -370,7 +371,7 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true),
new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -427,12 +428,12 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
new CountFieldAggregatorFactory(false),
- new AvgFieldAggregatorFactory(1, false) }),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldAggregatorFactory(3, false) }),
+ new AvgFieldMergeAggregatorFactory(3, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -850,7 +851,7 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true),
new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -904,7 +905,7 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true),
new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
@@ -964,12 +965,12 @@
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
new CountFieldAggregatorFactory(false),
- new AvgFieldAggregatorFactory(1, false) }),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(2, false),
new IntSumFieldAggregatorFactory(3, false),
- new AvgFieldAggregatorFactory(4, false) }),
+ new AvgFieldMergeAggregatorFactory(4, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 892ae39..8db3f48 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -230,7 +230,7 @@
switch (alg) {
case 0: // new external hash graph
- grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
+ grouper = new ExternalGroupOperatorDescriptor(
spec,
keys,
framesLimit,
@@ -276,7 +276,7 @@
IntegerBinaryHashFunctionFactory.INSTANCE }));
spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
- grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(
+ grouper = new PreclusteredGroupOperatorDescriptor(
spec,
keys,
new IBinaryComparatorFactory[] {