Fixed the bug on new refactored aggregators of missing frames in the merging phase.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_spilling_groupby_perf@393 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
index 486fffa..7baa384 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -20,7 +20,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.io.DataOutput;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
@@ -29,12 +28,12 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
/**
* @author jarodwen
*/
public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
private static final long serialVersionUID = 1L;
private static final int INIT_ACCUMULATORS_SIZE = 8;
@@ -58,8 +57,6 @@
if (this.outField < 0)
this.outField = keyFields.length;
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-
return new IAggregatorDescriptor() {
byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
@@ -68,47 +65,55 @@
int aggregatorCount = 0;
@Override
- public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+ public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
- int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- int fieldCount = accessor2.getFieldCount();
- int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- int fieldLength = accessor2.getFieldLength(tIndex2, outField);
// FIXME Should be done in binary way
+ int bufIndex = IntegerSerializerDeserializer.getInt(data, offset);
StringBuilder sbder = new StringBuilder();
- sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart, fieldLength))));
+ if (bufIndex < 0) {
+ // Need to allocate a new field
+ currentAggregatorIndex++;
+ aggregatorCount++;
+ bufIndex = currentAggregatorIndex;
+ sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(data, offset + 4, length - 4))));
+ } else {
+ sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(buf[bufIndex], 0, buf[bufIndex].length))));
+ }
+
+
+
+ int utfLength = (data[offset + 4] << 2) + data[offset + 4 + 1];
// Get the new data
- tupleOffset = accessor1.getTupleStartOffset(tIndex1);
- fieldCount = accessor1.getFieldCount();
- fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
- fieldLength = accessor1.getFieldLength(tIndex1, outField);
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int fieldLength = accessor.getFieldLength(tIndex, outField);
sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart, fieldLength))));
+ new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart + 4, fieldLength - 4))));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
- ByteBuffer wrapBuf = accessor2.getBuffer();
- wrapBuf.position(tupleOffset + 2 * fieldCount + fieldStart);
- wrapBuf.put(baos.toByteArray());
+ buf[bufIndex] = baos.toByteArray();
+
+ // Update the ref index
+ ByteBuffer buf = ByteBuffer.wrap(data, offset, 4);
+ buf.putInt(bufIndex);
+ return 4 + 2 + utfLength;
}
@Override
- public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, keyFields[i]);
- }
// Initialize the aggregation value
int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int tupleEndOffset = accessor.getTupleEndOffset(tIndex);
int fieldCount = accessor.getFieldCount();
int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+ int fieldLength = accessor.getFieldLength(tIndex, concatField);
int appendOffset = tupleOffset + 2 * fieldCount + fieldStart;
// Get the initial value
currentAggregatorIndex++;
@@ -119,149 +124,100 @@
}
this.buf = newBuf;
}
- buf[currentAggregatorIndex] = new byte[tupleEndOffset - appendOffset + 1];
- System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
- tupleEndOffset - appendOffset + 1);
+ buf[currentAggregatorIndex] = new byte[fieldLength];
+ System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0, fieldLength);
// Update the aggregator index
aggregatorCount++;
- tb.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-
- return false;
+ try {
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
+ } catch (IOException e) {
+ throw new HyracksDataException();
}
- return true;
+ }
+
+ @Override
+ public void reset() {
+ currentAggregatorIndex = -1;
+ aggregatorCount = 0;
}
@Override
public void close() {
currentAggregatorIndex = -1;
aggregatorCount = 0;
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = null;
+ }
}
@Override
- public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
- int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- int fieldCount = accessor2.getFieldCount();
- int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- int refIndex = IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2
- * fieldCount + fieldStart);
+ int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
// FIXME Should be done in binary way
StringBuilder sbder = new StringBuilder();
sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
new ByteArrayInputStream(buf[refIndex]))));
// Get the new data
- tupleOffset = accessor1.getTupleStartOffset(tIndex1);
- fieldCount = accessor1.getFieldCount();
- fieldStart = accessor1.getFieldStartOffset(tIndex1, concatField);
- int fieldLength = accessor1.getFieldLength(tIndex1, concatField);
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+ int fieldLength = accessor.getFieldLength(tIndex, concatField);
sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
+ new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ fieldStart, fieldLength))));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
+ if(baos.size() > 10){
+ System.err.println("Possible Error here");
+ }
buf[refIndex] = baos.toByteArray();
+ return 4;
}
@Override
- public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
-
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- tb.addField(accessor, tIndex, outField);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
-
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
-
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldCount = accessor.getFieldCount();
int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
* fieldCount + fieldStart);
- tb.addField(buf[refIndex], 0, buf[refIndex].length);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
- int fieldLength = accessor.getFieldLength(tIndex, concatField);
-
- // Get the initial value
- currentAggregatorIndex++;
- if (currentAggregatorIndex >= buf.length) {
- byte[][] newBuf = new byte[buf.length * 2][];
- for (int i = 0; i < buf.length; i++) {
- newBuf[i] = buf[i];
+ try {
+ if (refIndex >= 0)
+ tupleBuilder.getDataOutput().write(buf[refIndex]);
+ else {
+ int fieldLength = accessor.getFieldLength(tIndex, outField);
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart
+ + 4, fieldLength - 4);
}
- this.buf = newBuf;
- }
- buf[currentAggregatorIndex] = new byte[fieldLength];
- System.arraycopy(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart,
- buf[currentAggregatorIndex], 0, fieldLength);
- // Update the aggregator index
- aggregatorCount++;
-
- try {
- dataOutput.writeInt(currentAggregatorIndex);
+ tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException();
}
}
@Override
- public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
int fieldLength = accessor.getFieldLength(tIndex, outField);
+ int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
+ * fieldCount + fieldOffset);
+
try {
- dataOutput.write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart,
- fieldLength);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- int bufIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
- * fieldCount + fieldStart);
- try {
- dataOutput.write(buf[bufIndex]);
+ tupleBuilder.getDataOutput().writeInt(-1);
+ if (refIndex < 0) {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + fieldCount * 2 + fieldOffset + 4, fieldLength - 4);
+ } else {
+ tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
+ }
+ tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
index 190ae53..f3648ab 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.dataflow.std.aggregators;
-import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,7 +22,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
/**
@@ -49,7 +47,6 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
if (this.outField < 0) {
this.outField = keyFields.length;
@@ -57,21 +54,9 @@
return new IAggregatorDescriptor() {
@Override
- public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, keyFields[i]);
- }
- // Insert the aggregation value
- tb.addField(IntegerSerializerDeserializer.INSTANCE, 1);
-
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
}
@Override
@@ -79,89 +64,48 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
- int count = 0;
-
- int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- int fieldCount = accessor2.getFieldCount();
- int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- count += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart) + 1;
- // Update the value of tuple 2
- ByteBuffer buf = accessor2.getBuffer();
- buf.position(tupleOffset + 2 * fieldCount + fieldStart);
- buf.putInt(count);
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ int count = buf.getInt(offset);
+ buf.putInt(offset, count + 1);
+ return 4;
}
@Override
- public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- // Insert the aggregation value
- tb.addField(accessor, tIndex, outField);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- @Override
- public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- // Insert the aggregation value
- tb.addField(accessor, tIndex, outField);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException {
- int count = 0;
- // Get value from tuple 1
- int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
- int fieldCount = accessor1.getFieldCount();
- int fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
- count += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- // Get value from tuple 2
- tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- fieldCount = accessor2.getFieldCount();
- fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- count += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- // Update the value of tuple 2
- ByteBuffer buf = accessor2.getBuffer();
- buf.position(tupleOffset + 2 * fieldCount + fieldStart);
- buf.putInt(count);
- }
-
- @Override
- public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
try {
- dataOutput.writeInt(1);
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
- throw new HyracksDataException();
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
}
}
@Override
- public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
int count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -169,29 +113,17 @@
int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ fieldStart);
-
- try {
- dataOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
+ // Update the value of tuple 2
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ count += buf.getInt(offset);
+ buf.putInt(offset, count);
+ return 4;
}
-
- @Override
- public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- int count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- try {
- dataOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
index b3eb90f..1dd837b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
@@ -14,143 +14,87 @@
*/
package edu.uci.ics.hyracks.dataflow.std.aggregators;
-import java.io.DataOutput;
-
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public interface IAggregatorDescriptor {
/**
- * Initialize the aggregator for the group indicated by the input tuple. Keys,
- * and the aggregation values will be written out onto the frame wrapped in the appender.
+ * Initialize the aggregator with an input tuple specified by the input frame and tuple index.
+ * This function will write the initialized partial result into the tuple builder.
*
* @param accessor
- * The frame containing the input tuple.
* @param tIndex
- * The tuple index in the frame.
- * @param appender
- * The output frame.
+ * @param tupleBuilder
+ * @throws HyracksDataException
+ */
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException;
+
+ /**
+ * Aggregate the input tuple with the partial result specified by the bytes. The new value then
+ * is written back to the bytes field specified.
+ *
+ * It is the developer's responsibility to have the new result not exceed the given bytes.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param data
+ * @param offset
+ * @param length
* @return
* @throws HyracksDataException
*/
- public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException;
/**
- * Get the initial aggregation value from the input tuple.
- * Compared with {@link #init(IFrameTupleAccessor, int, FrameTupleAppender)}, instead of
- * writing the value to a frame appender, the value will be written through the given {@link DataOutput}.
- *
- * @param accessor
- * The frame containing the input tuple.
- * @param tIndex
- * The tuple index in the frame.
- * @param dataOutput
- * The data output wrapper.
- * @throws HyracksDataException
- */
- public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException;
-
- /**
- * Aggregate the input tuple with the given partial aggregation value. The partial aggregation value will
- * be updated after the aggregation.
- *
- * @param accessor1
- * The frame containing the input tuple to be aggregated.
- * @param tIndex1
- * The tuple index in the frame.
- * @param accessor2
- * The frame containing the partial aggregation value.
- * @param tIndex2
- * The tuple index of the partial aggregation value. Note that finally this value will be
- * updated to be the new aggregation value.
- * @throws HyracksDataException
- */
- public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException;
-
- /**
- * Merge the partial aggregation value with another partial aggregation value. After merging, the aggregation
- * value of the second partial aggregator will be updated.
- *
- * @param accessor1
- * The frame containing the tuple of partial aggregation value to be aggregated.
- * @param tIndex1
- * The index of the tuple of partial aggregation value to be aggregated.
- * @param accessor2
- * The frame containing the tuple of partial aggregation value to be updated.
- * @param tIndex2
- * The index of the tuple of partial aggregation value to be updated. Note that after merging,
- * this value will be updated to be the new aggregation value.
- */
- public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException;
-
- /**
- * Output the partial aggregation result into a frame appender. The outputted result can be used to
- * aggregate a new tuple from the input frame.
- * This method, together with the {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)},
- * is for the case when different processing logics are applied to partial aggregation result and the merged
- * aggregation result.
- * For example, in an aggregator for variable-length aggregation results, aggregation values should be maintained
- * inside of the aggregators, instead of the frames. A reference will be used to indicate the aggregation value
- * in the partial aggregation result, while the actual aggregation value will be used in the merged aggregation
- * result.
+ * Merge two partial aggregation results. The merged value then is written back to the bytes
+ * fields specified.
*
* @param accessor
* @param tIndex
- * @param appender
+ * @param data
+ * @param offset
+ * @param length
* @return
* @throws HyracksDataException
*/
- public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException;
/**
- * Output the merged aggregation result into a frame appender. The outputted result can be used to
- * merge another partial aggregation result.
- * See {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)} for the difference
- * between these two methods.
+ * Output the partial aggregation result to an array tuple builder. Necessary additional information
+ * for aggregation should be maintained.
+ *
+ * For example, for an aggregator calculating AVG, the count and also the current average should be
+ * maintained as the partial results.
+ *
*
* @param accessor
* @param tIndex
- * @param appender
+ * @param tupleBuilder
+ * @throws HyracksDataException
+ */
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException;
+
+ /**
+ * Output the final aggregation result to an array tuple builder.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param tupleBuilder
* @return
* @throws HyracksDataException
*/
- public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException;
- /**
- * Get the partial aggregation value from the merged aggregator indicated by the frame and the tuple index.
- * Compared with {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)}, this will output the value
- * through the given {@link DataOutput}.
- *
- * @param accessor
- * @param tIndex
- * @param dataOutput
- * @throws HyracksDataException
- */
- public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException;
+ public void reset();
/**
- * Get the merged aggregation value from the merged aggregator indicated by the frame and the tuple index.
- * Compared with {@link #outputMergeResult(IFrameTupleAccessor, int, FrameTupleAppender)}, this will output the value
- * through the given {@link DataOutput}.
- *
- * @param accessor
- * @param tIndex
- * @param dataOutput
- * @throws HyracksDataException
- */
- public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException;
-
- /**
* Close the aggregator. Necessary clean-up code should be implemented here.
*/
public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
index 28a5c79..23a90d6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.io.DataOutput;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
@@ -24,7 +23,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
/**
* @author jarodwen
@@ -49,8 +47,6 @@
public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-
if (this.outField < 0) {
this.outField = keyFields.length;
}
@@ -58,105 +54,7 @@
return new IAggregatorDescriptor() {
@Override
- public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, keyFields[i]);
- }
- // Insert the aggregation value
- tb.addField(accessor, tIndex, aggField);
-
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException {
- int sum = 0;
- int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
- int fieldCount = accessor1.getFieldCount();
- int fieldStart = accessor1.getFieldStartOffset(tIndex1, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- fieldCount = accessor2.getFieldCount();
- fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- sum += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- // Update the value of tuple 2
- ByteBuffer buf = accessor2.getBuffer();
- buf.position(tupleOffset + 2 * fieldCount + fieldStart);
- buf.putInt(sum);
- }
-
- @Override
- public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- // Insert the aggregation value
- tb.addField(accessor, tIndex, outField);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- // Insert the aggregation value
- tb.addField(accessor, tIndex, outField);
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException {
- int sum = 0;
- // Get value from tuple 1
- int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
- int fieldCount = accessor1.getFieldCount();
- int fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
- sum += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- // Get value from tuple 2
- tupleOffset = accessor2.getTupleStartOffset(tIndex2);
- fieldCount = accessor2.getFieldCount();
- fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
- sum += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- // Update the value of tuple 2
- ByteBuffer buf = accessor2.getBuffer();
- buf.position(tupleOffset + 2 * fieldCount + fieldStart);
- buf.putInt(sum);
- }
-
- @Override
- public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -165,15 +63,63 @@
sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ fieldStart);
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart);
+ // Update the value of tuple 2
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ buf.putInt(offset, sum);
+ return 4;
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
try {
- dataOutput.writeInt(sum);
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
- throw new HyracksDataException();
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
}
}
@Override
- public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -181,29 +127,17 @@
int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ fieldStart);
-
- try {
- dataOutput.writeInt(sum);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
+ // Update the value of tuple 2
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ buf.putInt(offset, sum);
+ return 4;
}
-
- @Override
- public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- int sum = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- try {
- dataOutput.writeInt(sum);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
index ef34569..e02087b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -14,14 +14,12 @@
*/
package edu.uci.ics.hyracks.dataflow.std.aggregators;
-import java.io.DataOutput;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
/**
* @author jarodwen
@@ -30,7 +28,7 @@
public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
-
+
private final IAggregatorDescriptorFactory[] aggregatorFactories;
public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
@@ -45,7 +43,6 @@
final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
throws HyracksDataException {
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
@@ -55,78 +52,47 @@
return new IAggregatorDescriptor() {
@Override
- public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, keyFields[i]);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getInitValue(accessor, tIndex, dos);
- tb.addFieldEndOffset();
- }
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].aggregate(accessor1, tIndex1, accessor2, tIndex2);
+ for(int i = 0; i < aggregators.length; i++){
+ aggregators[i].init(accessor, tIndex, tupleBuilder);
}
}
@Override
- public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
+ int adjust = 0;
for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].merge(accessor1, tIndex1, accessor2, tIndex2);
+ adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
+ }
+ return adjust;
+ }
+
+ @Override
+ public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int adjust = 0;
+ for (int i = 0; i < aggregators.length; i++) {
+ adjust += aggregators[i].merge(accessor, tIndex, data, offset + adjust, length - adjust);
+ }
+ return adjust;
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ for(int i = 0; i < aggregators.length; i++){
+ aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
}
}
@Override
- public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
+ for(int i = 0; i < aggregators.length; i++){
+ aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
}
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getPartialOutputValue(accessor, tIndex, dos);
- tb.addFieldEndOffset();
- }
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
- }
-
- @Override
- public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
- throws HyracksDataException {
- // Construct the tuple using keys and sum value
- tb.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tb.addField(accessor, tIndex, i);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getMergeOutputValue(accessor, tIndex, dos);
- tb.addFieldEndOffset();
- }
- // Write the tuple out
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- return false;
- }
- return true;
}
@Override
@@ -137,30 +103,12 @@
}
@Override
- public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getInitValue(accessor, tIndex, dataOutput);
- }
- }
-
- @Override
- public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getPartialOutputValue(accessor, tIndex, dataOutput);
- }
- }
-
- @Override
- public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].getMergeOutputValue(accessor, tIndex, dataOutput);
+ public void reset(){
+ for(int i = 0; i < aggregators.length; i++){
+ aggregators[i].reset();
}
}
};
}
-
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
index ea486f1..fdf4b5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -35,7 +36,6 @@
/**
* @author jarodwen
- *
*/
public class BSTSpillableGroupingTableFactory implements ISpillableTableFactory {
@@ -61,8 +61,7 @@
storedKeys[i] = i;
storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
}
- final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
- outRecordDescriptor);
+ final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -75,6 +74,8 @@
final ByteBuffer outFrame = ctx.allocateFrame();
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+
return new ISpillableTable() {
/**
@@ -130,13 +131,21 @@
}
if (!foundGroup) {
- // Did not find the aggregator. Insert a new aggregator entry
- if(!aggregator.init(accessor, tIndex, appender)){
- if(!nextAvailableFrame()){
+ // If no matching group is found, create a new aggregator
+ // Create a tuple for the new group
+ tupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ tupleBuilder.addField(accessor, tIndex, keyFields[i]);
+ }
+ aggregator.init(accessor, tIndex, tupleBuilder);
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ if (!nextAvailableFrame()) {
return false;
} else {
- if(!aggregator.init(accessor, tIndex, appender)){
- throw new IllegalStateException("Failed to init an aggergator.");
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException("Failed to init an aggregator");
}
}
}
@@ -145,7 +154,13 @@
// Add entry into the binary search tree
bstreeAdd(sbIndex, stIndex, parentPtr, isSmaller);
} else {
- aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, stIndex);
+ // If there is a matching found, do aggregation directly
+ int tupleOffset = storedKeysAccessor1.getTupleStartOffset(stIndex);
+ int fieldCount = storedKeysAccessor1.getFieldCount();
+ int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(stIndex, keyFields.length);
+ int tupleEnd = storedKeysAccessor1.getTupleEndOffset(stIndex);
+ aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset + 2
+ * fieldCount + aggFieldOffset, tupleEnd - (tupleOffset + 2 * fieldCount + aggFieldOffset));
}
return true;
}
@@ -161,11 +176,11 @@
}
@Override
- public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
writer.open();
appender.reset(outFrame, true);
- traversalBSTree(writer, 0, appender);
+ traversalBSTree(writer, 0, appender, isPartial);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
}
@@ -211,20 +226,33 @@
return true;
}
- private void traversalBSTree(IFrameWriter writer, int nodePtr, FrameTupleAppender appender)
+ private void traversalBSTree(IFrameWriter writer, int nodePtr, FrameTupleAppender appender, boolean isPartial)
throws HyracksDataException {
if (nodePtr < 0 || nodePtr >= bstreeSize)
return;
- traversalBSTree(writer, bstree[nodePtr + 2], appender);
+ traversalBSTree(writer, bstree[nodePtr + 2], appender, isPartial);
int bIndex = bstree[nodePtr];
int tIndex = bstree[nodePtr + 1];
storedKeysAccessor1.reset(frames.get(bIndex));
- while (!aggregator.outputPartialResult(storedKeysAccessor1, tIndex, appender)) {
+ // Reset the tuple for the partial result
+ tupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ tupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+ }
+ if (isPartial)
+ aggregator.outputPartialResult(storedKeysAccessor1, tIndex,
+ tupleBuilder);
+ else
+ aggregator.outputResult(storedKeysAccessor1, tIndex,
+ tupleBuilder);
+ while (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
}
- traversalBSTree(writer, bstree[nodePtr + 3], appender);
+
+ traversalBSTree(writer, bstree[nodePtr + 3], appender, isPartial);
}
private void bstreeAdd(int bufferIdx, int tIndex, int parentPtr, boolean isSmaller) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 0527e9e..8b0d8a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -51,7 +52,6 @@
/**
* @author jarodwen
- *
*/
public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -81,9 +81,11 @@
private final ISpillableTableFactory spillableTableFactory;
+ private final boolean isOutputSorted;
+
public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory) {
+ RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
super(spec, 1, 1);
this.framesLimit = framesLimit;
if (framesLimit <= 1) {
@@ -96,6 +98,7 @@
this.keyFields = keyFields;
this.comparatorFactories = comparatorFactories;
this.spillableTableFactory = spillableTableFactory;
+ this.isOutputSorted = isOutputSorted;
// Set the record descriptor. Note that since
// this operator is a unary operator,
@@ -203,7 +206,7 @@
writer.open();
try {
gTable.sortFrames();
- gTable.flushFrames(writer);
+ gTable.flushFrames(writer, true);
} catch (Exception ex) {
throw new HyracksDataException(ex);
} finally {
@@ -242,6 +245,9 @@
for (int i = 0; i < keyFields.length; ++i) {
storedKeys[i] = i;
}
+ // Tuple builder
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
/**
* Input frames, one for each run file.
@@ -251,7 +257,7 @@
/**
* Output frame.
*/
- private ByteBuffer outFrame;
+ private ByteBuffer outFrame, writerFrame;
/**
* List of the run files to be merged
@@ -266,6 +272,9 @@
private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescriptors[0]);
+ private ArrayTupleBuilder finalTupleBuilder;
+ private FrameTupleAppender writerFrameAppender;
+
@SuppressWarnings("unchecked")
public void initialize() throws HyracksDataException {
runs = (LinkedList<RunFileReader>) env.get(RUNS);
@@ -274,7 +283,9 @@
if (runs.size() <= 0) {
ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
if (gTable != null) {
- gTable.flushFrames(writer);
+ if (isOutputSorted)
+ gTable.sortFrames();
+ gTable.flushFrames(writer, false);
}
env.set(GROUPTABLES, null);
} else {
@@ -304,15 +315,14 @@
IFrameWriter writer = this.writer;
boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) {
+ if (runs.size() + 2 <= framesLimit) {
// All in-frames can be fit into memory, so no run file
// will be produced and this will be the final pass.
finalPass = true;
- // Remove the unnecessary frames. Note that we only need
- // frames to contain all inFrames but no extra out frame,
- // since the output is directly outputed to the writer.
- for (int i = inFrames.size() - 1; i >= runs.size(); i--)
- inFrames.remove(i);
+ // Remove the unnecessary frames.
+ while (inFrames.size() > runs.size()) {
+ inFrames.remove(inFrames.size() - 1);
+ }
} else {
// Files need to be merged.
newRun = ctx.getJobletContext().createWorkspaceFile(
@@ -357,51 +367,57 @@
}
// Start merging
- int keyIndexInOutFrame = -1;
-
while (!topTuples.areRunsExhausted()) {
// Get the top record
ReferenceEntry top = topTuples.peek();
int tupleIndex = top.getTupleIndex();
int runIndex = topTuples.peek().getRunid();
FrameTupleAccessor fta = top.getAccessor();
- if (keyIndexInOutFrame < 0) {
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
// Initialize the first output record
- if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender))
- throw new IllegalStateException("Failed to flush a merge frame!");
+ // Reset the tuple builder
+ tupleBuilder.reset();
+
+ for (int i = 0; i < keyFields.length; i++) {
+ tupleBuilder.addField(fta, tupleIndex, i);
}
- keyIndexInOutFrame = outFrameAccessor.getTupleCount() - 1;
+
+ currentWorkingAggregator.outputPartialResult(fta, tupleIndex, tupleBuilder);
+
+ if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ // Make sure that when the outFrame is being flushed, all results in it are in
+ // the correct state
+ flushOutFrame(writer, finalPass);
+ if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to append an aggregation result to the output frame.");
+ }
} else {
- if (compareFrameTuples(fta, tupleIndex, outFrameAccessor, keyIndexInOutFrame) == 0) {
- // if new tuple is in the same group of the current aggregator
- // do merge and output to the outFrame
- currentWorkingAggregator.merge(fta, tupleIndex, outFrameAccessor,
- keyIndexInOutFrame);
- } else {
- // otherwise, create a new output record in the outFrame
- if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex,
- outFrameAppender))
- throw new IllegalStateException("Failed to flush a merge frame!");
- }
- keyIndexInOutFrame = outFrameAccessor.getTupleCount() - 1;
- }
+ // if new tuple is in the same group of the current aggregator
+ // do merge and output to the outFrame
+ int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
+ int fieldCount = outFrameAccessor.getFieldCount();
+ int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
+ keyFields.length);
+ int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
+ keyFields.length);
+ currentWorkingAggregator.merge(fta, tupleIndex, outFrameAccessor.getBuffer().array(),
+ tupleOffset + 2 * fieldCount + fieldOffset, fieldLength);
+
}
tupleIndices[runIndex]++;
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
}
- // After processing all records, flush the aggregator
- currentWorkingAggregator.close();
// Flush the outFrame
if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
+ flushOutFrame(writer, finalPass);
}
+ // After processing all records, flush the aggregator
+ currentWorkingAggregator.close();
// Remove the processed run files
runs.subList(0, inFrames.size()).clear();
// insert the new run file into the beginning of the run
@@ -416,6 +432,46 @@
}
}
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+ if (isFinal) {
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+ }
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+ if (writerFrameAppender == null) {
+ writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ outFrameAccessor.reset(outFrame);
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+ finalTupleBuilder.reset();
+ for (int j = 0; j < keyFields.length; j++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, j);
+ }
+ currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
+
+ if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
+ if(writerFrameAppender.getTupleCount() > 0){
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ outFrameAppender.reset(outFrame, true);
+ }
+
private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
throws HyracksDataException {
@@ -530,4 +586,5 @@
}
}
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index 4e6470f..4108adf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -37,7 +38,6 @@
/**
* @author jarodwen
- *
*/
public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
@@ -50,14 +50,11 @@
private final int tableSize;
- private final boolean isSorted;
-
- public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize, boolean isSorted) {
+ public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
this.tpcf = tpcf;
this.tableSize = tableSize;
- this.isSorted = isSorted;
}
-
+
/* (non-Javadoc)
* @see edu.uci.ics.hyracks.dataflow.std.group.ISpillableTableFactory#buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[], edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
*/
@@ -73,7 +70,7 @@
storedKeys[i] = i;
storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
}
-
+
final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
@@ -92,6 +89,8 @@
final ByteBuffer outFrame = ctx.allocateFrame();
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+
return new ISpillableTable() {
private int dataFrameCount;
@@ -154,11 +153,19 @@
// Do insert
if (!foundGroup) {
// If no matching group is found, create a new aggregator
- if(!aggregator.init(accessor, tIndex, appender)){
- if(!nextAvailableFrame()){
+ // Create a tuple for the new group
+ tupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ tupleBuilder.addField(accessor, tIndex, keyFields[i]);
+ }
+ aggregator.init(accessor, tIndex, tupleBuilder);
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ if (!nextAvailableFrame()) {
return false;
} else {
- if(!aggregator.init(accessor, tIndex, appender)){
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
throw new IllegalStateException("Failed to init an aggregator");
}
}
@@ -167,12 +174,17 @@
sbIndex = dataFrameCount;
stIndex = appender.getTupleCount() - 1;
link.add(sbIndex, stIndex);
-
+
} else {
// If there is a matching found, do aggregation directly
- aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, stIndex);
+ int tupleOffset = storedKeysAccessor1.getTupleStartOffset(stIndex);
+ int fieldCount = storedKeysAccessor1.getFieldCount();
+ int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(stIndex, keyFields.length);
+ int tupleEnd = storedKeysAccessor1.getTupleEndOffset(stIndex);
+ aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset + 2
+ * fieldCount + aggFieldOffset, tupleEnd - (tupleOffset + 2 * fieldCount + aggFieldOffset));
}
-
+
return true;
}
@@ -187,13 +199,12 @@
}
@Override
- public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
writer.open();
appender.reset(outFrame, true);
- if(isSorted)
- sortFrames();
+
if (tPointers == null) {
// Not sorted
for (int i = 0; i < table.length; ++i) {
@@ -203,7 +214,19 @@
int bIndex = link.pointers[j];
int tIndex = link.pointers[j + 1];
storedKeysAccessor1.reset(frames.get(bIndex));
- while (!aggregator.outputPartialResult(storedKeysAccessor1, tIndex, appender)) {
+ // Reset the tuple for the partial result
+ tupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ tupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+ }
+ if (isPartial)
+ aggregator.outputPartialResult(storedKeysAccessor1, tIndex,
+ tupleBuilder);
+ else
+ aggregator.outputResult(storedKeysAccessor1, tIndex,
+ tupleBuilder);
+ while (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
}
@@ -227,10 +250,24 @@
storedKeysAccessor1.reset(buffer);
// Insert
- if (!aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, appender)) {
+ // Reset the tuple for the partial result
+ tupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ tupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
+ }
+ if (isPartial)
+ aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex,
+ tupleBuilder);
+ else
+ aggregator.outputResult(storedKeysAccessor1, tupleIndex,
+ tupleBuilder);
+
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
- if (!aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, appender)) {
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
throw new IllegalStateException();
}
}
@@ -425,4 +462,5 @@
return sb.toString();
}
}
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 462e570..05f985f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -37,5 +37,5 @@
public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
- public void flushFrames(IFrameWriter writer) throws HyracksDataException;
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
index 1831a4d..4d4dce2 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
@@ -157,8 +157,7 @@
new CountAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(1, 2),
new ConcatAggregatorDescriptorFactory(4, 3) }), outRecordDescriptor,
new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize,
- true));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
index 4c9eede..eea510c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
@@ -54,16 +54,23 @@
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
/**
* @author jarodwen
- *
*/
public class ExternalGroupRefactorTest extends AbstractIntegrationTest {
+
final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+ static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
+
+ static final String outSplits1 = "nc1:" + outSplitsPrefix + "aggregation_";
+ static final String outSplits2 = "nc2:" + outSplitsPrefix + "aggregation_";
+
+ static final boolean isOutputFile = true;
+
final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
@@ -82,6 +89,33 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, }, '|');
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+
+ private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
+ String prefix) {
+ AbstractSingleActivityOperatorDescriptor printer;
+
+ if (!isOutputFile)
+ printer = new PrinterOperatorDescriptor(spec);
+ else
+ printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
+ parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+
+ return printer;
+ }
+
@Test
public void hashSingleKeyScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -92,9 +126,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE
- });
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 3;
@@ -105,7 +137,7 @@
new IntSumAggregatorDescriptorFactory(1), outputRec, new HashSpillableGroupingTableFactory(
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize, true));
+ tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -113,8 +145,10 @@
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashSingleKeyScalarGroupTest");
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -141,7 +175,7 @@
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new IntSumAggregatorDescriptorFactory(1), outputRec, new BSTSpillableGroupingTableFactory());
+ new IntSumAggregatorDescriptorFactory(1), outputRec, new BSTSpillableGroupingTableFactory(), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -150,7 +184,7 @@
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile, "BSTSingleKeyScalarGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -182,7 +216,7 @@
UTF8StringBinaryComparatorFactory.INSTANCE }, new IntSumAggregatorDescriptorFactory(1),
outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -191,7 +225,8 @@
UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyScalarGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -225,7 +260,7 @@
new IntSumAggregatorDescriptorFactory(2, 3) }), outputRec,
new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -234,7 +269,8 @@
UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyMultipleScalarGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -261,18 +297,21 @@
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, new ConcatAggregatorDescriptorFactory(9),
- outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new ConcatAggregatorDescriptorFactory(9), outputRec, new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyNonScalarGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -281,7 +320,7 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
@Test
public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -293,7 +332,8 @@
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE});
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0, 9 };
int frameLimits = 3;
@@ -303,10 +343,11 @@
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorDescriptorFactory(
new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(1, 2),
- new IntSumAggregatorDescriptorFactory(2, 3), new ConcatAggregatorDescriptorFactory(9, 4) }), outputRec,
+ new IntSumAggregatorDescriptorFactory(2, 3),
+ new ConcatAggregatorDescriptorFactory(9, 4) }), outputRec,
new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -315,7 +356,8 @@
UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyMultipleFieldsGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);