Update #52: fixed bugs on external grouper on merging phase; fixed bugs on aggregation integration tests.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@883 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index 680e7cb..31d406d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -39,7 +39,6 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -315,7 +314,6 @@
ctx.getFrameSize());
private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
ctx.getFrameSize(), recordDescriptors[0]);
- private ArrayTupleBuilder finalTupleBuilder;
private FrameTupleAppender writerFrameAppender;
public void initialize() throws HyracksDataException {
@@ -450,11 +448,10 @@
* Initialize the first output record Reset the
* tuple builder
*/
-
- if (!aggregator.init(outFrameAppender, fta,
+ if (!aggregator.initFromPartial(outFrameAppender, fta,
tupleIndex, aggregateState)) {
flushOutFrame(writer, finalPass);
- if (!aggregator.init(outFrameAppender, fta,
+ if (!aggregator.initFromPartial(outFrameAppender, fta,
tupleIndex, aggregateState)) {
throw new HyracksDataException(
"Failed to append an aggregation result to the output frame.");
@@ -501,10 +498,6 @@
private void flushOutFrame(IFrameWriter writer, boolean isFinal)
throws HyracksDataException {
- if (finalTupleBuilder == null) {
- finalTupleBuilder = new ArrayTupleBuilder(
- recordDescriptors[0].getFields().length);
- }
if (writerFrame == null) {
writerFrame = ctx.allocateFrame();
}
@@ -516,9 +509,6 @@
outFrameAccessor.reset(outFrame);
for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
- for (int j = 0; j < keyFields.length; j++) {
- finalTupleBuilder.addField(outFrameAccessor, i, j);
- }
if(isFinal){
if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
@@ -539,7 +529,6 @@
}
}
}
- aggregator.reset();
}
if (writerFrameAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(writerFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index 28a71ac..8c6338d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -65,6 +65,10 @@
private final FrameTupleAppender appender;
private final List<ByteBuffer> buffers;
private final Link[] table;
+ /**
+ * Aggregate states: a list of states for all groups maintained in the main
+ * memory.
+ */
private AggregateState[] aggregateStates;
private int accumulatorSize;
@@ -165,12 +169,15 @@
throw new IllegalStateException();
}
}
- // Add aggregation fields
+ // Add index to the keys in frame
int sbIndex = lastBIndex;
int stIndex = appender.getTupleCount() - 1;
+
+ // Add aggregation fields
AggregateState newState = aggregator.createAggregateStates();
- aggregator.init(appender, accessor, tIndex, newState);
-
+
+ aggregator.init(null, accessor, tIndex, newState);
+
if (accumulatorSize >= aggregateStates.length) {
aggregateStates = Arrays.copyOf(aggregateStates,
aggregateStates.length * 2);
@@ -198,8 +205,9 @@
ByteBuffer keyBuffer = buffers.get(bIndex);
storedKeysAccessor.reset(keyBuffer);
- while (!aggregator.outputFinalResult(appender,
- storedKeysAccessor, tIndex, aggregateStates[aIndex])) {
+ while (!aggregator
+ .outputFinalResult(appender, storedKeysAccessor,
+ tIndex, aggregateStates[aIndex])) {
flushFrame(appender, writer);
}
aggregator.reset();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index 1299a1e..d13cc5d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -294,7 +294,6 @@
appender.reset(outFrame, true);
}
}
- aggregator.reset();
} while (true);
}
if (appender.getTupleCount() != 0) {
@@ -344,7 +343,6 @@
}
}
}
- aggregator.reset();
}
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
index 75c546f..3851f26 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
@@ -20,7 +20,12 @@
*
*/
public interface IAggregateStateFactory extends Serializable {
-
+
+ /**
+ * Get the (partial) state length in binary.
+ *
+ * @return
+ */
public int getStateLength();
public Object createState();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
index 176806f..dc2a30e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
@@ -30,6 +30,11 @@
*/
public AggregateState createAggregateStates();
+ /**
+ * Get the length of the binary states.
+ *
+ * @return
+ */
public int getAggregateStatesLength();
/**
@@ -47,6 +52,22 @@
public boolean init(FrameTupleAppender appender,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
+
+ /**
+ * Initialize the state based on the partial results.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param fieldOutput
+ * The data output for the frame containing the state. This may
+ * be null, if the state is maintained as a java object
+ * @param state
+ * The state to be initialized.
+ * @throws HyracksDataException
+ */
+ public boolean initFromPartial(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 50da4cb..c4df536 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -27,13 +27,18 @@
public IAggregateStateFactory getAggregateStateFactory();
/**
- * Initialize the state based on the input tuple.
+ * Initialize the state based on the input tuple.
*
* @param accessor
* @param tIndex
* @param fieldOutput
* The data output for the frame containing the state. This may
- * be null, if the state is maintained as a java object
+ * be null, if the state is maintained as a java object.
+ *
+ * Note that we have an assumption that the initialization of
+ * the binary state (if any) inserts the state fields into the
+ * buffer in a appending fashion. This means that an arbitrary
+ * initial size of the state can be accquired.
* @param state
* The state to be initialized.
* @throws HyracksDataException
@@ -43,7 +48,10 @@
throws HyracksDataException;
/**
- * Initialize the state based on the input tuple.
+ * Initialize the state by loading the partial results. This is specified
+ * since for some aggregations (like avg), the partial results and final
+ * results are different, and different initialization methods should be
+ * used.
*
* @param accessor
* @param tIndex
@@ -76,7 +84,11 @@
* @param data
* The buffer containing the state, if frame-based-state is used.
* This means that it can be null if java-object-based-state is
- * used.
+ * used.
+ *
+ * Here the length of binary state can be obtains from the state
+ * parameter, and if the content to be filled into that is over-
+ * flowing (larger than the reversed space), error should be emit.
* @param offset
* @param state
* The aggregate state.
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
index 7db12f7..0695c5a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
@@ -105,13 +105,16 @@
public void init(IFrameTupleAccessor accessor, int tIndex,
DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
+
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+
sum += IntegerSerializerDeserializer.getInt(accessor
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
+
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
@@ -168,6 +171,7 @@
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
+
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
sum += buf.getInt(offset);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
index b76c285..1ae7aa4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -42,10 +42,11 @@
private final int aggField;
private final boolean isMax;
-
+
private final boolean hasBinaryState;
- public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
+ public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax,
+ boolean hasBinaryState) {
this.aggField = aggField;
this.isMax = isMax;
this.hasBinaryState = hasBinaryState;
@@ -76,9 +77,10 @@
throws HyracksDataException {
try {
if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+ int stateIdx = IntegerSerializerDeserializer.getInt(
+ data, offset);
Object[] storedState = (Object[]) state.getState();
- fieldOutput.writeUTF((String)storedState[stateIdx]);
+ fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
fieldOutput.writeUTF((String) state.getState());
}
@@ -94,13 +96,11 @@
throws HyracksDataException {
try {
if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+ int stateIdx = IntegerSerializerDeserializer.getInt(
+ data, offset);
Object[] storedState = (Object[]) state.getState();
- fieldOutput.writeUTF((String)storedState[stateIdx]);
+ fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
- if(((String)state.getState()).equalsIgnoreCase("ic platelets lose carefully. blithely unu")){
- System.out.print("");
- }
fieldOutput.writeUTF((String) state.getState());
}
} catch (IOException e) {
@@ -124,11 +124,13 @@
+ fieldStart, fieldLength)));
if (hasBinaryState) {
// Object-binary-state
- Object[] storedState = (Object[]) state.getState();
- if (storedState == null) {
+ Object[] storedState;
+ if (state.getState() == null) {
storedState = new Object[8];
storedState[0] = new Integer(0);
state.setState(storedState);
+ } else {
+ storedState = (Object[]) state.getState();
}
int stateCount = (Integer) (storedState[0]);
if (stateCount + 1 >= storedState.length) {
@@ -173,6 +175,7 @@
if (hasBinaryState) {
int stateIdx = IntegerSerializerDeserializer.getInt(data,
offset);
+
Object[] storedState = (Object[]) state.getState();
if (isMax) {
@@ -204,24 +207,24 @@
@Override
public IAggregateStateFactory getAggregateStateFactory() {
return new IAggregateStateFactory() {
-
+
private static final long serialVersionUID = 1L;
@Override
public boolean hasObjectState() {
return true;
}
-
+
@Override
public boolean hasBinaryState() {
return hasBinaryState;
}
-
+
@Override
public int getStateLength() {
return 4;
}
-
+
@Override
public Object createState() {
return null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index fcc961f..c3c98cc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -82,7 +82,7 @@
return new IAggregatorDescriptor() {
- private boolean pending;
+ private boolean initPending, outputPending;
@Override
public void reset() {
@@ -91,14 +91,15 @@
aggregateStateFactories[i] = aggregators[i]
.getAggregateStateFactory();
}
- pending = false;
+ initPending = false;
+ outputPending = false;
}
@Override
public boolean outputPartialResult(FrameTupleAppender appender,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- if (!pending) {
+ if (!outputPending) {
resultTupleBuilder.reset();
for (int i = 0; i < keyFields.length; i++) {
resultTupleBuilder.addField(accessor, tIndex,
@@ -121,9 +122,10 @@
if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
resultTupleBuilder.getByteArray(), 0,
resultTupleBuilder.getSize())) {
- pending = true;
+ outputPending = true;
return false;
}
+ outputPending = false;
return true;
}
@@ -132,7 +134,7 @@
public boolean outputFinalResult(FrameTupleAppender appender,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- if (!pending) {
+ if (!outputPending) {
resultTupleBuilder.reset();
for (int i = 0; i < keyFields.length; i++) {
resultTupleBuilder.addField(accessor, tIndex,
@@ -160,9 +162,10 @@
if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
resultTupleBuilder.getByteArray(), 0,
resultTupleBuilder.getSize())) {
- pending = true;
+ outputPending = true;
return false;
}
+ outputPending = false;
return true;
}
@@ -170,7 +173,7 @@
public boolean init(FrameTupleAppender appender,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- if (!pending) {
+ if (!initPending) {
stateTupleBuilder.reset();
for (int i = 0; i < keyFields.length; i++) {
stateTupleBuilder.addField(accessor, tIndex,
@@ -188,14 +191,16 @@
}
// For pre-cluster: no output state is needed
if(appender == null){
+ initPending = false;
return true;
}
if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
stateTupleBuilder.getByteArray(), 0,
stateTupleBuilder.getSize())) {
- pending = true;
+ initPending = true;
return false;
}
+ initPending = false;
return true;
}
@@ -261,6 +266,41 @@
}
}
}
+
+ @Override
+ public boolean initFromPartial(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ if (!initPending) {
+ stateTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ stateTupleBuilder.addField(accessor, tIndex,
+ keyFields[i]);
+ }
+ DataOutput dos = stateTupleBuilder.getDataOutput();
+
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].initFromPartial(accessor, tIndex, dos,
+ ((AggregateState[]) state.getState())[i]);
+ if (aggregateStateFactories[i].hasBinaryState()) {
+ stateTupleBuilder.addFieldEndOffset();
+ }
+ }
+ }
+ // For pre-cluster: no output state is needed
+ if(appender == null){
+ initPending = false;
+ return true;
+ }
+ if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
+ initPending = true;
+ return false;
+ }
+ initPending = false;
+ return true;
+ }
};
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 6ebf22d..d89709f 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -239,7 +239,7 @@
IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
+ int frameLimits = 4;
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
@@ -253,7 +253,7 @@
new IntSumFieldAggregatorFactory(3, false) }),
new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }),
+ new IntSumFieldAggregatorFactory(2, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -408,7 +408,7 @@
FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
+ int frameLimits = 4;
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
@@ -423,8 +423,8 @@
new AvgFieldAggregatorFactory(1, false) }),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldAggregatorFactory(2, false) }),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldAggregatorFactory(3, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -577,7 +577,7 @@
UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
+ int frameLimits = 4;
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(