Merged -r 502:525 from hyracks_dev_next into branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@527 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
index f40e8cd..c433bf9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
@@ -67,7 +67,6 @@
String[] opParts = null;
if (!jas.allocations.containsKey(oid)) {
Set<ConstraintExpression> opConstraints = jas.opConstraints.get(oid);
- System.err.println("Constraints: " + opConstraints);
for (ConstraintExpression ce : opConstraints) {
int nParts = getNumPartitions(oid, ce);
if (nParts != -1) {
@@ -137,12 +136,9 @@
}
private int getNumPartitions(OperatorDescriptorId oid, ConstraintExpression ce) {
- System.err.println(ce);
if (ce.getTag() == ExpressionTag.RELATIONAL) {
RelationalExpression re = (RelationalExpression) ce;
if (re.getOperator() == RelationalExpression.Operator.EQUAL) {
- System.err.println("Left: " + re.getLeft());
- System.err.println("Right: " + re.getRight());
if (re.getLeft().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
return getNumPartitions(oid, (PartitionCountExpression) re.getLeft(), re.getRight());
} else if (re.getRight().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
@@ -209,4 +205,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..123fedf
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final int avgField;
+ private int outField = -1;
+
+ public AvgAggregatorDescriptorFactory(int avgField) {
+ this.avgField = avgField;
+ }
+
+ public AvgAggregatorDescriptorFactory(int avgField, int outField) {
+ this.avgField = avgField;
+ this.outField = outField;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
+
+ if (this.outField < 0)
+ this.outField = keyFields.length;
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
+
+ try {
+ tupleBuilder.getDataOutput().writeInt(sum / count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
+ try {
+ tupleBuilder.getDataOutput().writeInt(sum);
+ tupleBuilder.getDataOutput().writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ // Init aggregation value
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, avgField);
+ int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ int count = 1;
+
+ try {
+ tupleBuilder.getDataOutput().writeInt(sum);
+ tupleBuilder.getDataOutput().writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int sum1 = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ int count1 = 1;
+
+ int sum2 = IntegerSerializerDeserializer.getInt(data, offset);
+ int count2 = IntegerSerializerDeserializer.getInt(data, offset + 4);
+
+ ByteBuffer buf = ByteBuffer.wrap(data, offset, 8);
+ buf.putInt(sum1 + sum2);
+ buf.putInt(count1 + count2);
+
+ return 8;
+ }
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..ef6a139
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int INIT_ACCUMULATORS_SIZE = 8;
+
+ private final int concatField;
+ private int outField = -1;
+
+ /**
+ * Initialize the aggregator, with the field to be concatenated.
+ *
+ * @param concatField
+ */
+ public ConcatAggregatorDescriptorFactory(int concatField) {
+ this.concatField = concatField;
+ }
+
+ /**
+ * Initialize the aggregator, with the field index to be concatenated, and
+ * also the field where the aggregation result will be outputted.
+ *
+ * @param concatField
+ * @param outField
+ */
+ public ConcatAggregatorDescriptorFactory(int concatField, int outField) {
+ this.concatField = concatField;
+ this.outField = outField;
+ }
+
+ /**
+ * Create a concatenation aggregator. A byte buffer will be allocated inside of the
+ * aggregator to contain the partial aggregation results. A reference will be written
+ * onto the output frame for indexing the aggregation result from the buffer.
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+ if (this.outField < 0)
+ this.outField = keyFields.length;
+
+ return new IAggregatorDescriptor() {
+
+ byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
+
+ int currentAggregatorIndex = -1;
+ int aggregatorCount = 0;
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ // Initialize the aggregation value
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+ int fieldLength = accessor.getFieldLength(tIndex, concatField);
+ int appendOffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ // Get the initial value
+ currentAggregatorIndex++;
+ if (currentAggregatorIndex >= buf.length) {
+ byte[][] newBuf = new byte[buf.length * 2][];
+ for (int i = 0; i < buf.length; i++) {
+ newBuf[i] = buf[i];
+ }
+ this.buf = newBuf;
+ }
+ buf[currentAggregatorIndex] = new byte[fieldLength];
+ System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
+ fieldLength);
+ // Update the aggregator index
+ aggregatorCount++;
+
+ try {
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+
+ @Override
+ public void reset() {
+ currentAggregatorIndex = -1;
+ aggregatorCount = 0;
+ }
+
+ @Override
+ public void close() {
+ currentAggregatorIndex = -1;
+ aggregatorCount = 0;
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = null;
+ }
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
+ // FIXME Should be done in binary way
+ StringBuilder sbder = new StringBuilder();
+ sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(buf[refIndex]))));
+ // Get the new data
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+ int fieldLength = accessor.getFieldLength(tIndex, concatField);
+ sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength() + fieldStart, fieldLength))));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
+ buf[refIndex] = baos.toByteArray();
+ return 4;
+ }
+
+ @Override
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+ int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength() + fieldStart);
+
+ try {
+ if (refIndex >= 0)
+ tupleBuilder.getDataOutput().write(buf[refIndex]);
+ else {
+ int fieldLength = accessor.getFieldLength(tIndex, outField);
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4, fieldLength - 4);
+ }
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
+ int fieldLength = accessor.getFieldLength(tIndex, outField);
+ int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength() + fieldOffset);
+
+ try {
+ tupleBuilder.getDataOutput().writeInt(-1);
+ if (refIndex < 0) {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldOffset + 4, fieldLength - 4);
+ } else {
+ tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
+ }
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException();
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..3a42fbc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int outField = -1;
+
+ public CountAggregatorDescriptorFactory() {
+ }
+
+ public CountAggregatorDescriptorFactory(int outField) {
+ this.outField = outField;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+ if (this.outField < 0) {
+ this.outField = keyFields.length;
+ }
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ int count = buf.getInt(offset);
+ buf.putInt(offset, count + 1);
+ return 4;
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
new file mode 100644
index 0000000..dfc31cd
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IAggregatorDescriptor {
+
+ /**
+ * Initialize the aggregator with an input tuple specified by the input
+ * frame and tuple index. This function will write the initialized partial
+ * result into the tuple builder.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param tupleBuilder
+ * @throws HyracksDataException
+ */
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException;
+
+ /**
+ * Aggregate the input tuple with the partial result specified by the bytes.
+ * The new value then is written back to the bytes field specified.
+ * It is the developer's responsibility to have the new result not exceed
+ * the given bytes.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param data
+ * @param offset
+ * @param length
+ * @return
+ * @throws HyracksDataException
+ */
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException;
+
+ /**
+ * Output the partial aggregation result to an array tuple builder.
+ * Necessary additional information for aggregation should be maintained.
+ * For example, for an aggregator calculating AVG, the count and also the
+ * current average should be maintained as the partial results.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param tupleBuilder
+ * @throws HyracksDataException
+ */
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException;
+
+ /**
+ * Output the final aggregation result to an array tuple builder.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param tupleBuilder
+ * @return
+ * @throws HyracksDataException
+ */
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException;
+
+ /**
+ * reset the internal states
+ */
+ public void reset();
+
+ /**
+ * Close the aggregator. Necessary clean-up code should be implemented here.
+ */
+ public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..e324e1c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IAggregatorDescriptorFactory extends Serializable {
+
+ /**
+ * Create an aggregator.
+ *
+ * @param ctx
+ * @param inRecordDescriptor
+ * @param outRecordDescriptor
+ * @param keyFields
+ * @return
+ * @throws HyracksDataException
+ */
+ IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..85a182f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final int aggField;
+ private int outField = -1;
+
+ public IntSumAggregatorDescriptorFactory(int aggField) {
+ this.aggField = aggField;
+ }
+
+ public IntSumAggregatorDescriptorFactory(int aggField, int outField) {
+ this.aggField = aggField;
+ this.outField = outField;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+ if (this.outField < 0) {
+ this.outField = keyFields.length;
+ }
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ // Update the value of tuple 2
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ buf.putInt(offset, sum);
+ return 4;
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+ try {
+ tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write int sum as a partial result.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..d30b2b2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+/**
+ * @author jarodwen
+ */
+public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IAggregatorDescriptorFactory[] aggregatorFactories;
+
+ public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
+ this.aggregatorFactories = aggregatorFactories;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(final IHyracksStageletContext ctx,
+ final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
+ throws HyracksDataException {
+
+ final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
+ keyFields);
+ }
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].init(accessor, tIndex, tupleBuilder);
+ }
+ }
+
+ @Override
+ public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+ throws HyracksDataException {
+ int adjust = 0;
+ for (int i = 0; i < aggregators.length; i++) {
+ adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
+ }
+ return adjust;
+ }
+
+ @Override
+ public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
+ }
+ }
+
+ @Override
+ public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
+ }
+ }
+
+ @Override
+ public void close() {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].close();
+ }
+ }
+
+ @Override
+ public void reset() {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].reset();
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
index 957c453..eb9c12d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.ISpillableAccumulatingAggregator;
public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
private static final long serialVersionUID = 1L;
@@ -36,92 +35,6 @@
}
@Override
- public ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
- RecordDescriptor inRecordDesc, final RecordDescriptor outRecordDescriptor) {
- final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
- for (int i = 0; i < aFactories.length; ++i) {
- aggregators[i] = aFactories[i].createSpillableFieldValueResultingAggregator();
- }
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
- return new ISpillableAccumulatingAggregator() {
- private boolean pending;
-
- @Override
- public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
- int[] keyFieldIndexes) throws HyracksDataException {
- if (!pending) {
- tb.reset();
- for (int i = 0; i < keyFieldIndexes.length; ++i) {
- tb.addField(accessor, tIndex, keyFieldIndexes[i]);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tb.addFieldEndOffset();
- }
- }
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].init(accessor, tIndex);
- }
- pending = false;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].accumulate(accessor, tIndex);
- }
- }
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].initFromPartial(accessor, tIndex, keyFieldIndexes.length + i);
- }
- pending = false;
- }
-
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].accumulatePartialResult(accessor, tIndex, keyFieldIndexes.length + i);
- }
-
- }
-
- @Override
- public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException {
- if (!pending) {
- // TODO Here to be fixed:
- DataOutput dos = tbder.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tbder.addFieldEndOffset();
- }
- }
- if (!appender.append(tbder.getFieldEndOffsets(), tbder.getByteArray(), 0, tbder.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
- };
- }
-
- @Override
public IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
index 4516c8d..5d94c12 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
@@ -16,8 +16,8 @@
public interface IRecordReader {
- public boolean read(Object[] record) throws Exception;
+ public boolean read(Object[] record) throws Exception;
- public void close();
-
+ public void close();
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
index a319727..679088f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
@@ -17,10 +17,9 @@
import java.io.File;
public interface IRecordWriter {
-
- public void close();
-
- public void write(Object[] record) throws Exception;
-
+ public void close();
+
+ public void write(Object[] record) throws Exception;
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 530207d..d599fee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -63,7 +63,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
// Output files
final FileSplit[] splits = fileSplitProvider.getFileSplits();
// Frame accessor
@@ -95,7 +95,7 @@
frameTupleAccessor.reset(buffer);
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
int start = frameTupleAccessor.getTupleStartOffset(tIndex)
- + frameTupleAccessor.getFieldSlotsLength();
+ + frameTupleAccessor.getFieldSlotsLength();
bbis.setByteBuffer(buffer, start);
Object[] record = new Object[recordDescriptor.getFields().length];
for (int i = 0; i < record.length; ++i) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index aa6635d..38ac1a3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -66,14 +66,14 @@
protected IRecordReader createRecordReader(File file, RecordDescriptor desc) throws Exception {
return new RecordReaderImpl(file, desc);
}
-
+
@Override
- protected void configure() throws Exception {
- // currently a no-op, but is meant to initialize , if required before it is asked
- // to create a record reader
- // this is executed at the node and is useful for operators that could not be
- // initialized from the client completely, because of lack of information specific
- // to the node where the operator gets executed.
-
- }
+ protected void configure() throws Exception {
+ // currently a no-op, but is meant to initialize , if required before it is asked
+ // to create a record reader
+ // this is executed at the node and is useful for operators that could not be
+ // initialized from the client completely, because of lack of information specific
+ // to the node where the operator gets executed.
+
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
index 1c52b25..1858a73 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
@@ -14,73 +14,72 @@
*/
package edu.uci.ics.hyracks.dataflow.std.file;
-import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-public abstract class RecordWriter implements IRecordWriter{
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
-
- protected final BufferedWriter bufferedWriter;
+public abstract class RecordWriter implements IRecordWriter {
+
+ protected final BufferedWriter bufferedWriter;
protected final int[] columns;
protected final char separator;
-
- public static final char COMMA = ',';
-
- public RecordWriter(Object [] args) throws Exception{
- OutputStream outputStream = createOutputStream(args);
- if(outputStream != null){
- bufferedWriter = new BufferedWriter(new OutputStreamWriter(createOutputStream(args)));
- }else{
- bufferedWriter = null;
- }
- this.columns = null;
- this.separator = COMMA;
- }
-
- public RecordWriter(int []columns, char separator, Object[] args) throws Exception{
- OutputStream outputStream = createOutputStream(args);
- if(outputStream != null){
- bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
- }else{
- bufferedWriter = null;
- }
- this.columns = columns;
- this.separator = separator;
- }
-
- @Override
- public void close() {
- try {
- bufferedWriter.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void write(Object[] record) throws Exception {
- if (columns == null) {
- for (int i = 0; i < record.length; ++i) {
- if (i != 0) {
- bufferedWriter.write(separator);
- }
- bufferedWriter.write(String.valueOf(record[i]));
- }
- } else {
- for (int i = 0; i < columns.length; ++i) {
- if (i != 0) {
- bufferedWriter.write(separator);
- }
- bufferedWriter.write(String.valueOf(record[columns[i]]));
- }
- }
- bufferedWriter.write("\n");
- }
-
- public abstract OutputStream createOutputStream(Object[] args) throws Exception;
-
+ public static final char COMMA = ',';
+
+ public RecordWriter(Object[] args) throws Exception {
+ OutputStream outputStream = createOutputStream(args);
+ if (outputStream != null) {
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(createOutputStream(args)));
+ } else {
+ bufferedWriter = null;
+ }
+ this.columns = null;
+ this.separator = COMMA;
+ }
+
+ public RecordWriter(int[] columns, char separator, Object[] args) throws Exception {
+ OutputStream outputStream = createOutputStream(args);
+ if (outputStream != null) {
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
+ } else {
+ bufferedWriter = null;
+ }
+ this.columns = columns;
+ this.separator = separator;
+ }
+
+ @Override
+ public void close() {
+ try {
+ bufferedWriter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void write(Object[] record) throws Exception {
+ if (columns == null) {
+ for (int i = 0; i < record.length; ++i) {
+ if (i != 0) {
+ bufferedWriter.write(separator);
+ }
+ bufferedWriter.write(StringSerializationUtils.toString(record[i]));
+ }
+ } else {
+ for (int i = 0; i < columns.length; ++i) {
+ if (i != 0) {
+ bufferedWriter.write(separator);
+ }
+ bufferedWriter.write(StringSerializationUtils.toString(record[columns[i]]));
+ }
+ }
+ bufferedWriter.write("\n");
+ }
+
+ public abstract OutputStream createOutputStream(Object[] args) throws Exception;
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 4b7aff3..fc5f282 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -23,98 +23,97 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-public class DeserializedPreclusteredGroupOperator implements
- IOpenableDataWriterOperator {
- private final int[] groupFields;
+public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
+ private final int[] groupFields;
- private final IComparator[] comparators;
+ private final IComparator[] comparators;
- private final IGroupAggregator aggregator;
+ private final IGroupAggregator aggregator;
- private Object[] lastData;
+ private Object[] lastData;
- private IOpenableDataWriter<Object[]> writer;
+ private IOpenableDataWriter<Object[]> writer;
- private List<Object[]> buffer;
+ private List<Object[]> buffer;
- private IOpenableDataReader<Object[]> reader;
+ private IOpenableDataReader<Object[]> reader;
- public DeserializedPreclusteredGroupOperator(int[] groupFields,
- IComparator[] comparators, IGroupAggregator aggregator) {
- this.groupFields = groupFields;
- this.comparators = comparators;
- this.aggregator = aggregator;
- buffer = new ArrayList<Object[]>();
- reader = new IOpenableDataReader<Object[]>() {
- private int idx;
+ public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators,
+ IGroupAggregator aggregator) {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.aggregator = aggregator;
+ buffer = new ArrayList<Object[]>();
+ reader = new IOpenableDataReader<Object[]>() {
+ private int idx;
- @Override
- public void open() {
- idx = 0;
- }
+ @Override
+ public void open() {
+ idx = 0;
+ }
- @Override
- public void close() {
- }
+ @Override
+ public void close() {
+ }
- @Override
- public Object[] readData() {
- return idx >= buffer.size() ? null : buffer.get(idx++);
- }
- };
- }
+ @Override
+ public Object[] readData() {
+ return idx >= buffer.size() ? null : buffer.get(idx++);
+ }
+ };
+ }
- @Override
- public void close() throws HyracksDataException {
- if (!buffer.isEmpty()) {
- aggregate();
- }
- writer.close();
- try {
- aggregator.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (!buffer.isEmpty()) {
+ aggregate();
+ }
+ writer.close();
+ try {
+ aggregator.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void aggregate() throws HyracksDataException {
- reader.open();
- aggregator.aggregate(reader, writer);
- reader.close();
- buffer.clear();
- }
+ private void aggregate() throws HyracksDataException {
+ reader.open();
+ aggregator.aggregate(reader, writer);
+ reader.close();
+ buffer.clear();
+ }
- @Override
- public void open() throws HyracksDataException {
- lastData = null;
- writer.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ lastData = null;
+ writer.open();
+ }
- @Override
- public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
+ @Override
+ public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+ if (index != 0) {
+ throw new IllegalArgumentException();
+ }
+ this.writer = writer;
+ }
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- if (lastData != null && compare(data, lastData) != 0) {
- aggregate();
- }
- lastData = data;
- buffer.add(data);
- }
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ if (lastData != null && compare(data, lastData) != 0) {
+ aggregate();
+ }
+ lastData = data;
+ buffer.add(data);
+ }
- private int compare(Object[] d1, Object[] d2) {
- for (int i = 0; i < groupFields.length; ++i) {
- int fIdx = groupFields[i];
- int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
+ private int compare(Object[] d1, Object[] d2) {
+ for (int i = 0; i < groupFields.length; ++i) {
+ int fIdx = groupFields[i];
+ int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
new file mode 100644
index 0000000..cdb0f4d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ /**
+ * The input frame identifier (in the job environment)
+ */
+ private static final String GROUPTABLES = "gtables";
+ /**
+ * The runs files identifier (in the job environment)
+ */
+ private static final String RUNS = "runs";
+ private final int[] keyFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final IAggregatorDescriptorFactory mergeFactory;
+ private final int framesLimit;
+ private final ISpillableTableFactory spillableTableFactory;
+ private final boolean isOutputSorted;
+
+ public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergeFactory,
+ RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ if (framesLimit <= 1) {
+ // Minimum of 2 frames: 1 for input records, and 1 for output
+ // aggregation results.
+ throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+ }
+
+ this.aggregatorFactory = aggregatorFactory;
+ this.mergeFactory = mergeFactory;
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+ this.spillableTableFactory = spillableTableFactory;
+ this.isOutputSorted = isOutputSorted;
+
+ // Set the record descriptor. Note that since
+ // this operator is a unary operator,
+ // only the first record descriptor is used here.
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ AggregateActivity aggregateAct = new AggregateActivity();
+ MergeActivity mergeAct = new MergeActivity();
+
+ builder.addTask(aggregateAct);
+ builder.addSourceEdge(0, aggregateAct, 0);
+
+ builder.addTask(mergeAct);
+ builder.addTargetEdge(0, mergeAct, 0);
+
+ builder.addBlockingEdge(aggregateAct, mergeAct);
+ }
+
+ private class AggregateActivity extends AbstractActivityNode {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return ExternalGroupOperatorDescriptor.this;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+ final ISpillableTable gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields,
+ comparatorFactories, firstNormalizerFactory, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ ExternalGroupOperatorDescriptor.this.framesLimit);
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ /**
+ * Run files
+ */
+ private LinkedList<RunFileReader> runs;
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs = new LinkedList<RunFileReader>();
+ gTable.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ // If the group table is too large, flush the table into
+ // a run file.
+ if (!gTable.insert(accessor, i)) {
+ flushFramesToRun();
+ if (!gTable.insert(accessor, i))
+ throw new HyracksDataException(
+ "Failed to insert a new buffer into the aggregate operator!");
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (gTable.getFrameCount() >= 0) {
+ if (runs.size() <= 0) {
+ // All in memory
+ env.set(GROUPTABLES, gTable);
+ } else {
+ // flush the memory into the run file.
+ flushFramesToRun();
+ gTable.close();
+ }
+ }
+ env.set(RUNS, runs);
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext().createWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+ writer.open();
+ try {
+ gTable.sortFrames();
+ gTable.flushFrames(writer, true);
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ writer.close();
+ }
+ gTable.reset();
+ runs.add(((RunFileWriter) writer).createReader());
+ }
+ };
+ return op;
+ }
+ }
+
+ private class MergeActivity extends AbstractActivityNode {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return ExternalGroupOperatorDescriptor.this;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
+ recordDescriptors[0], recordDescriptors[0], keyFields);
+ final int[] storedKeys = new int[keyFields.length];
+ // Get the list of the fields in the stored records.
+ for (int i = 0; i < keyFields.length; ++i) {
+ storedKeys[i] = i;
+ }
+ // Tuple builder
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ /**
+ * Input frames, one for each run file.
+ */
+ private List<ByteBuffer> inFrames;
+
+ /**
+ * Output frame.
+ */
+ private ByteBuffer outFrame, writerFrame;
+
+ /**
+ * List of the run files to be merged
+ */
+ private LinkedList<RunFileReader> runs;
+
+ /**
+ * how many frames to be read ahead once
+ */
+ private int runFrameLimit = 1;
+
+ private int[] currentFrameIndexInRun;
+ private int[] currentRunFrames;
+ private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptors[0]);
+ private ArrayTupleBuilder finalTupleBuilder;
+ private FrameTupleAppender writerFrameAppender;
+
+ @SuppressWarnings("unchecked")
+ public void initialize() throws HyracksDataException {
+ runs = (LinkedList<RunFileReader>) env.get(RUNS);
+ writer.open();
+ try {
+ if (runs.size() <= 0) {
+ ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
+ if (gTable != null) {
+ if (isOutputSorted)
+ gTable.sortFrames();
+ gTable.flushFrames(writer, false);
+ }
+ env.set(GROUPTABLES, null);
+ } else {
+ long start = System.currentTimeMillis();
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outFrameAppender.reset(outFrame, true);
+ outFrameAccessor.reset(outFrame);
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ inFrames.clear();
+ long end = System.currentTimeMillis();
+ System.out.println("merge time " + (end - start));
+ }
+ } finally {
+ writer.close();
+ }
+ env.set(RUNS, null);
+ }
+
+ private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+
+ while (inFrames.size() + 2 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int runNumber;
+ if (runs.size() + 2 <= framesLimit) {
+ finalPass = true;
+ runFrameLimit = (framesLimit - 2) / runs.size();
+ runNumber = runs.size();
+ } else {
+ runNumber = framesLimit - 2;
+ newRun = ctx.getJobletContext().createWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ currentFrameIndexInRun = new int[runNumber];
+ currentRunFrames = new int[runNumber];
+ // Create file readers for each input run file, only
+ // for the ones fit into the inFrames
+ RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+ // Create input frame accessor
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ // Build a priority queue for extracting tuples in order
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+ recordDescriptors[0], runNumber, comparator);
+ // Maintain a list of visiting index for all runs'
+ // current frame
+ int[] tupleIndices = new int[runNumber];
+ for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
+ tupleIndices[runIndex] = 0;
+ // Load the run file
+ runFileReaders[runIndex] = runs.get(runIndex);
+ runFileReaders[runIndex].open();
+
+ currentRunFrames[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptors[0]);
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ currentRunFrames[runIndex]++;
+ if (j == 0)
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
+ topTuples);
+ } else {
+ break;
+ }
+ }
+ }
+
+ // Start merging
+ while (!topTuples.areRunsExhausted()) {
+ // Get the top record
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+ // Initialize the first output record
+ // Reset the tuple builder
+ tupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ tupleBuilder.addField(fta, tupleIndex, i);
+ }
+
+ currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
+ if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ // Make sure that when the outFrame is being
+ // flushed, all results in it are in
+ // the correct state
+ flushOutFrame(writer, finalPass);
+ if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to append an aggregation result to the output frame.");
+ }
+ } else {
+ // if new tuple is in the same group of the
+ // current aggregator
+ // do merge and output to the outFrame
+ int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
+ int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
+ keyFields.length);
+ int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
+ keyFields.length);
+ currentWorkingAggregator.aggregate(fta, tupleIndex, outFrameAccessor.getBuffer()
+ .array(), tupleOffset + outFrameAccessor.getFieldSlotsLength() + fieldOffset,
+ fieldLength);
+ }
+ tupleIndices[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ }
+ // Flush the outFrame
+ if (outFrameAppender.getTupleCount() > 0) {
+ flushOutFrame(writer, finalPass);
+ }
+ // After processing all records, flush the aggregator
+ currentWorkingAggregator.close();
+ runs.subList(0, runNumber).clear();
+ // insert the new run file into the beginning of the run
+ // file list
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+ }
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+ if (writerFrameAppender == null) {
+ writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ outFrameAccessor.reset(outFrame);
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+ finalTupleBuilder.reset();
+ for (int j = 0; j < keyFields.length; j++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, j);
+ }
+ if (isFinal)
+ currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
+ else
+ currentWorkingAggregator.outputPartialResult(outFrameAccessor, i, finalTupleBuilder);
+
+ if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
+ if (writerFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ outFrameAppender.reset(outFrame, true);
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+ throws HyracksDataException {
+ int runStart = runIndex * runFrameLimit;
+ boolean existNext = false;
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+ // run already closed
+ existNext = false;
+ } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+ // not the last frame for this run
+ existNext = true;
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex]++;
+ }
+ } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
+ .getTupleCount()) {
+ // the last frame has expired
+ existNext = true;
+ } else {
+ /**
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ int frameOffset = runIndex * runFrameLimit;
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = frameOffset;
+ /**
+ * read in batch
+ */
+ currentRunFrames[runIndex] = 0;
+ for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
+ ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
+ if (runCursors[runIndex].nextFrame(buffer)) {
+ tupleAccessors[frameOffset].reset(buffer);
+ if (tupleAccessors[frameOffset].getTupleCount() > 0) {
+ existNext = true;
+ } else {
+ throw new IllegalStateException("illegal: empty run file");
+ }
+ currentRunFrames[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ }
+ // Check whether the run file for the given runIndex has
+ // more tuples
+ if (existNext) {
+ topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
+ tupleIndices[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ // Note: Since the comparison is only used in the merge
+ // phase,
+ // all the keys are clustered at the beginning of the
+ // tuple.
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ return op;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+
+ @Override
+ public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+ int j1 = o1.getTupleIndex();
+ int j2 = o2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ // Note: Since the comparison is only used in the merge
+ // phase,
+ // all the keys are clustered at the beginning of the
+ // tuple.
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ };
+ }
+
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
deleted file mode 100644
index b13b087..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
-
-/**
- * This is an implementation of the external hash group operator.
- * The motivation of this operator is that when tuples are processed in
- * parallel, distinguished aggregating keys partitioned on one node may exceed
- * the main memory, so aggregation results should be output onto the disk to
- * make space for aggregating more input tuples.
- */
-public class ExternalHashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
- /**
- * The input frame identifier (in the job environment)
- */
- private static final String GROUPTABLES = "gtables";
-
- /**
- * The runs files identifier (in the job environment)
- */
- private static final String RUNS = "runs";
-
- /**
- * The fields used for grouping (grouping keys).
- */
- private final int[] keyFields;
-
- /**
- * The comparator for checking the grouping conditions, corresponding to the {@link #keyFields}.
- */
- private final IBinaryComparatorFactory[] comparatorFactories;
-
- /**
- * The aggregator factory for the aggregating field, corresponding to the {@link #aggregateFields}.
- */
- private IAccumulatingAggregatorFactory aggregatorFactory;
-
- /**
- * The maximum number of frames in the main memory.
- */
- private final int framesLimit;
-
- /**
- * Indicate whether the final output will be sorted or not.
- */
- private final boolean sortOutput;
-
- /**
- * Partition computer factory
- */
- private final ITuplePartitionComputerFactory tpcf;
-
- /**
- * The size of the in-memory table, which should be specified now by the
- * creator of this operator descriptor.
- */
- private final int tableSize;
-
- /**
- * XXX Logger for debug information
- */
- private static Logger LOGGER = Logger.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
-
- /**
- * Constructor of the external hash group operator descriptor.
- *
- * @param spec
- * @param keyFields
- * The fields as keys of grouping.
- * @param framesLimit
- * The maximum number of frames to be used in memory.
- * @param sortOutput
- * Whether the output should be sorted or not. Note that if the
- * input data is large enough for external grouping, the output
- * will be sorted surely. The only case that when the output is
- * not sorted is when the size of the input data can be grouped
- * in memory and this parameter is false.
- * @param tpcf
- * The partitioner.
- * @param comparatorFactories
- * The comparators.
- * @param aggregatorFactory
- * The aggregators.
- * @param recordDescriptor
- * The record descriptor for the input data.
- * @param tableSize
- * The maximum size of the in memory table usable to this
- * operator.
- */
- public ExternalHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- boolean sortOutput, ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
- IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor recordDescriptor, int tableSize) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- if (framesLimit <= 1) {
- // Minimum of 2 frames: 1 for input records, and 1 for output
- // aggregation results.
- throw new IllegalStateException();
- }
- this.aggregatorFactory = aggregatorFactory;
- this.keyFields = keyFields;
- this.comparatorFactories = comparatorFactories;
-
- this.sortOutput = sortOutput;
-
- this.tpcf = tpcf;
-
- this.tableSize = tableSize;
-
- // Set the record descriptor. Note that since this operator is a unary
- // operator,
- // only the first record descritpor is used here.
- recordDescriptors[0] = recordDescriptor;
- }
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /*
- * (non-Javadoc)
- *
- * @see
- * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
- * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
- */
- @Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
- MergeActivity mergeAct = new MergeActivity();
-
- builder.addTask(partialAggAct);
- builder.addSourceEdge(0, partialAggAct, 0);
-
- builder.addTask(mergeAct);
- builder.addTargetEdge(0, mergeAct, 0);
-
- // FIXME Block or not?
- builder.addBlockingEdge(partialAggAct, mergeAct);
-
- }
-
- private class PartialAggregateActivity extends AbstractActivityNode {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
- final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- // Create the in-memory hash table
- final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(ctx, keyFields,
- comparatorFactories, tpcf, aggregatorFactory, recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
- // Always take one frame for the input records
- framesLimit - 1, tableSize);
- // Create the tuple accessor
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
- // Create the partial aggregate activity node
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-
- /**
- * Run files
- */
- private LinkedList<RunFileReader> runs;
-
- @Override
- public void close() throws HyracksDataException {
- if (gTable.getFrameCount() >= 0) {
- if (runs.size() <= 0) {
- // All in memory
- env.set(GROUPTABLES, gTable);
- } else {
- // flush the memory into the run file.
- flushFramesToRun();
- }
- }
- env.set(RUNS, runs);
- }
-
- @Override
- public void flush() throws HyracksDataException {
-
- }
-
- /**
- * Process the next input buffer.
- * The actual insertion is processed in {@link #gTable}. It will
- * check whether it is possible to contain the data into the
- * main memory or not. If not, it will indicate the operator to
- * flush the content of the table into a run file.
- */
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- // If the group table is too large, flush the table into
- // a run file.
- if (!gTable.insert(accessor, i)) {
- flushFramesToRun();
- if (!gTable.insert(accessor, i))
- throw new HyracksDataException(
- "Failed to insert a new buffer into the aggregate operator!");
- }
- }
-
- }
-
- @Override
- public void open() throws HyracksDataException {
- runs = new LinkedList<RunFileReader>();
- gTable.reset();
- }
-
- /**
- * Flush the content of the group table into a run file.
- * During the flushing, the hash table will be sorted as first.
- * After that, a run file handler is initialized and the hash
- * table is flushed into the run file.
- *
- * @throws HyracksDataException
- */
- private void flushFramesToRun() throws HyracksDataException {
- // Sort the contents of the hash table.
- gTable.sortFrames();
- FileReference runFile;
- try {
- runFile = ctx.getJobletContext().createWorkspaceFile(
- ExternalHashGroupOperatorDescriptor.class.getSimpleName());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
- writer.open();
- try {
- gTable.flushFrames(writer, true);
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- writer.close();
- }
- gTable.reset();
- runs.add(((RunFileWriter) writer).createReader());
- LOGGER.warning("Created run file: " + runFile.getFile().getAbsolutePath());
- }
-
- };
-
- return op;
- }
-
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
-
- }
-
- private class MergeActivity extends AbstractActivityNode {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
- final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- /**
- * Input frames, one for each run file.
- */
- private List<ByteBuffer> inFrames;
-
- /**
- * Output frame.
- */
- private ByteBuffer outFrame;
-
- /**
- * List of the run files to be merged
- */
- LinkedList<RunFileReader> runs;
-
- /**
- * Tuple appender for the output frame {@link #outFrame}.
- */
- private FrameTupleAppender outFrameAppender;
-
- private ISpillableAccumulatingAggregator visitingAggregator;
- private ArrayTupleBuilder visitingKeyTuple;
-
- @SuppressWarnings("unchecked")
- @Override
- public void initialize() throws HyracksDataException {
- runs = (LinkedList<RunFileReader>) env.get(RUNS);
- writer.open();
-
- try {
- if (runs.size() <= 0) {
- // If the aggregate results can be fit into
- // memory...
- SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env.get(GROUPTABLES);
- if (gTable != null) {
- gTable.flushFrames(writer, sortOutput);
- }
- env.set(GROUPTABLES, null);
- } else {
- // Otherwise, merge the run files into a single file
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.allocateFrame());
- }
- int passCount = 0;
- while (runs.size() > 0) {
- passCount++;
- try {
- doPass(runs, passCount);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- } finally {
- writer.close();
- }
- env.set(RUNS, null);
- }
-
- /**
- * Merge the run files once.
- *
- * @param runs
- * @param passCount
- * @throws HyracksDataException
- * @throws IOException
- */
- private void doPass(LinkedList<RunFileReader> runs, int passCount) throws HyracksDataException,
- IOException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
-
- int[] storedKeys = new int[keyFields.length];
- // Get the list of the fields in the stored records.
- for (int i = 0; i < keyFields.length; ++i) {
- storedKeys[i] = i;
- }
-
- // Release the space not used
- if (runs.size() + 1 <= framesLimit) {
- // If there are run files no more than the available
- // frame slots...
- // No run file to be generated, since the result can be
- // directly
- // outputted into the output frame for write.
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- // Otherwise, a new run file will be created
- newRun = ctx.getJobletContext().createWorkspaceFile(
- ExternalHashGroupOperatorDescriptor.class.getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- // Create run file read handler for each input frame
- RunFileReader[] runFileReaders = new RunFileReader[inFrames.size()];
- // Create input frame accessor
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
- recordDescriptors[0], inFrames.size(), comparator);
- // For the index of tuples visited in each frame.
- int[] tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runFileReaders[runIndex] = runs.get(runIndex);
- runFileReaders[runIndex].open();
- // Load the first frame of the file into the main
- // memory
- if (runFileReaders[runIndex].nextFrame(inFrames.get(runIndex))) {
- // initialize the tuple accessor for the frame
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptors[0]);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
- } else {
- closeRun(runIndex, runFileReaders, tupleAccessors);
- }
- }
- // Merge
- // Get a key holder for the current working
- // aggregator keys
- visitingAggregator = null;
- visitingKeyTuple = null;
- // Loop on all run files, and update the key
- // holder.
- while (!topTuples.areRunsExhausted()) {
- // Get the top record
- ReferenceEntry top = topTuples.peek();
- int tupleIndex = top.getTupleIndex();
- int runIndex = topTuples.peek().getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- if (visitingAggregator == null) {
- // Initialize the aggregator
- visitingAggregator = aggregatorFactory.createSpillableAggregator(ctx,
- recordDescriptors[0], recordDescriptors[0]);
- // Initialize the partial aggregation result
- visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
- visitingKeyTuple = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
- for (int i = 0; i < keyFields.length; i++) {
- visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
- }
- } else {
- if (compareTupleWithFrame(visitingKeyTuple, fta, tupleIndex, storedKeys, keyFields,
- comparators) == 0) {
- // If the two partial results are on the
- // same key
- visitingAggregator.accumulatePartialResult(fta, tupleIndex, keyFields);
- } else {
- // Otherwise, write the partial result back
- // to the output frame
- if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
- throw new IllegalStateException();
- }
- }
- // Reset the partial aggregation result
- visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
- visitingKeyTuple.reset();
- for (int i = 0; i < keyFields.length; i++) {
- visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
- }
- }
- }
- tupleIndexes[runIndex]++;
- setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
- }
- // Output the last aggregation result in the frame
- if (visitingAggregator != null) {
- if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
- throw new IllegalStateException();
- }
- }
- }
- // Output data into run file writer after all tuples
- // have been checked
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- // empty the input frames
- runs.subList(0, inFrames.size()).clear();
- // insert the new run file into the beginning of the run
- // file list
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
- }
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
-
- /**
- * Insert the tuple into the priority queue.
- *
- * @param runIndex
- * @param tupleIndexes
- * @param runCursors
- * @param tupleAccessors
- * @param topTuples
- * @throws IOException
- */
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- if (exists) {
- topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- /**
- * Check whether there are any more tuples to be checked for the
- * given run file from the corresponding input frame.
- * If the input frame for this run file is exhausted, load a new
- * frame of the run file into the input frame.
- *
- * @param runIndex
- * @param tupleIndexes
- * @param runCursors
- * @param tupleAccessors
- * @return
- * @throws IOException
- */
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws IOException {
-
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
- /*
- * Return false if the targeting run file is not
- * available, or the frame for the run file is not
- * available.
- */
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- /*
- * If all tuples in the targeting frame have been
- * checked.
- */
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- // Refill the buffer with contents from the run file.
- if (runCursors[runIndex].nextFrame(buf)) {
- tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
-
- /**
- * Close the run file, and also the corresponding readers and
- * input frame.
- *
- * @param index
- * @param runCursors
- * @param tupleAccessor
- * @throws HyracksDataException
- */
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
-
- /**
- * Compare a tuple (in the format of a {@link ArrayTupleBuilder} ) with a record in a frame (in the format of a {@link FrameTupleAccessor}). Comparing keys and comparators
- * are specified for this method as inputs.
- *
- * @param tuple0
- * @param accessor1
- * @param tIndex1
- * @param keys0
- * @param keys1
- * @param comparators
- * @return
- */
- private int compareTupleWithFrame(ArrayTupleBuilder tuple0, FrameTupleAccessor accessor1, int tIndex1,
- int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
- for (int i = 0; i < keys0.length; ++i) {
- int fIdx0 = keys0[i];
- int fStart0 = (i == 0 ? 0 : tuple0.getFieldEndOffsets()[fIdx0 - 1]);
- int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
- int fLen0 = fEnd0 - fStart0;
-
- int fIdx1 = keys1[i];
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
- int fLen1 = fEnd1 - fStart1;
-
- int c = comparators[i].compare(tuple0.getByteArray(), fStart0, fLen0, accessor1.getBuffer()
- .array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- return op;
- }
-
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
-
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
- public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = (Integer) tp1.getTupleIndex();
- int j2 = (Integer) tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = keyFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- }
-
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
new file mode 100644
index 0000000..a8b61fc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -0,0 +1,448 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final ITuplePartitionComputerFactory tpcf;
+ private final int tableSize;
+
+ public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
+ this.tpcf = tpcf;
+ this.tableSize = tableSize;
+ }
+
+ @Override
+ public ISpillableTable buildSpillableTable(final IHyracksStageletContext ctx, final int[] keyFields,
+ final IBinaryComparatorFactory[] comparatorFactories,
+ final INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ final IAggregatorDescriptorFactory aggregateDescriptorFactory, final RecordDescriptor inRecordDescriptor,
+ final RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
+ final int[] storedKeys = new int[keyFields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+ for (int i = 0; i < keyFields.length; i++) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+ }
+
+ RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+ final FrameTupleAccessor storedKeysAccessor1;
+ final FrameTupleAccessor storedKeysAccessor2;
+ if (keyFields.length >= outRecordDescriptor.getFields().length) {
+ // for the case of zero-aggregations
+ ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
+ ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
+ ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+ for (int i = 0; i < fields.length; i++)
+ newFields[i] = fields[i];
+ ITypeTrait[] newTypes = null;
+ if (types != null) {
+ newTypes = new ITypeTrait[types.length + 1];
+ for (int i = 0; i < types.length; i++)
+ newTypes[i] = types[i];
+ }
+ internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+ }
+ storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+ storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
+ final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+ final ByteBuffer outFrame = ctx.allocateFrame();
+
+ final ArrayTupleBuilder internalTupleBuilder;
+ if (keyFields.length < outRecordDescriptor.getFields().length)
+ internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ else
+ internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
+ final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+ .createNormalizedKeyComputer();
+
+ return new ISpillableTable() {
+ private int dataFrameCount;
+ private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
+ private final TuplePointer storedTuplePointer = new TuplePointer();
+ private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+ private int groupSize = 0;
+ private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
+ inRecordDescriptor, outRecordDescriptor, keyFields);
+
+ /**
+ * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+ * = Frame index in the "Frames" list, [1] = Tuple index in the
+ * frame, [2] = Poor man's normalized key for the tuple.
+ */
+ private int[] tPointers;
+
+ @Override
+ public void reset() {
+ groupSize = 0;
+ dataFrameCount = -1;
+ tPointers = null;
+ table.reset();
+ aggregator.close();
+ }
+
+ @Override
+ public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ if (dataFrameCount < 0)
+ nextAvailableFrame();
+ int entry = tpc.partition(accessor, tIndex, tableSize);
+ boolean foundGroup = false;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+ int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
+ if (c == 0) {
+ foundGroup = true;
+ break;
+ }
+ } while (true);
+
+ if (!foundGroup) {
+ // If no matching group is found, create a new aggregator
+ // Create a tuple for the new group
+ internalTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
+ }
+ aggregator.init(accessor, tIndex, internalTupleBuilder);
+ if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ if (!nextAvailableFrame()) {
+ return false;
+ } else {
+ if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new IllegalStateException("Failed to init an aggregator");
+ }
+ }
+ }
+
+ storedTuplePointer.frameIndex = dataFrameCount;
+ storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+ table.insert(entry, storedTuplePointer);
+ groupSize++;
+ } else {
+ // If there is a matching found, do aggregation directly
+ int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
+ int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(storedTuplePointer.tupleIndex,
+ keyFields.length);
+ int tupleLength = storedKeysAccessor1.getFieldLength(storedTuplePointer.tupleIndex,
+ keyFields.length);
+ aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset
+ + storedKeysAccessor1.getFieldSlotsLength() + aggFieldOffset, tupleLength);
+ }
+ return true;
+ }
+
+ @Override
+ public List<ByteBuffer> getFrames() {
+ return frames;
+ }
+
+ @Override
+ public int getFrameCount() {
+ return dataFrameCount;
+ }
+
+ @Override
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ writer.open();
+ appender.reset(outFrame, true);
+ if (tPointers == null) {
+ // Not sorted
+ for (int i = 0; i < tableSize; ++i) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ int bIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(bIndex));
+ // Reset the tuple for the partial result
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+ }
+ if (isPartial)
+ aggregator.outputPartialResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
+ else
+ aggregator.outputResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
+ while (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ }
+ } while (true);
+ }
+ if (appender.getTupleCount() != 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ aggregator.close();
+ return;
+ }
+ int n = tPointers.length / 3;
+ for (int ptr = 0; ptr < n; ptr++) {
+ int tableIndex = tPointers[ptr * 3];
+ int rowIndex = tPointers[ptr * 3 + 1];
+ table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
+ int frameIndex = storedTuplePointer.frameIndex;
+ int tupleIndex = storedTuplePointer.tupleIndex;
+ // Get the frame containing the value
+ ByteBuffer buffer = frames.get(frameIndex);
+ storedKeysAccessor1.reset(buffer);
+
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
+ }
+ if (isPartial)
+ aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
+ else
+ aggregator.outputResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
+ if (!appender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ aggregator.close();
+ }
+
+ /**
+ * Set the working frame to the next available frame in the frame
+ * list. There are two cases:<br>
+ * 1) If the next frame is not initialized, allocate a new frame. 2)
+ * When frames are already created, they are recycled.
+ *
+ * @return Whether a new frame is added successfully.
+ */
+ private boolean nextAvailableFrame() {
+ // Return false if the number of frames is equal to the limit.
+ if (dataFrameCount + 1 >= framesLimit)
+ return false;
+
+ if (frames.size() < framesLimit) {
+ // Insert a new frame
+ ByteBuffer frame = ctx.allocateFrame();
+ frame.position(0);
+ frame.limit(frame.capacity());
+ frames.add(frame);
+ appender.reset(frame, true);
+ dataFrameCount = frames.size() - 1;
+ } else {
+ // Reuse an old frame
+ dataFrameCount++;
+ ByteBuffer frame = frames.get(dataFrameCount);
+ frame.position(0);
+ frame.limit(frame.capacity());
+ appender.reset(frame, true);
+ }
+ return true;
+ }
+
+ @Override
+ public void sortFrames() {
+ int sfIdx = storedKeys[0];
+ int totalTCount = table.getTupleCount();
+ tPointers = new int[totalTCount * 3];
+ int ptr = 0;
+
+ for (int i = 0; i < tableSize; i++) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ tPointers[ptr * 3] = entry;
+ tPointers[ptr * 3 + 1] = offset;
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ int fIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(fIndex));
+ int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
+ int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
+ int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
+ int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
+ tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
+ .array(), f0Start, f0EndRel - f0StartRel);
+ ptr++;
+ offset++;
+ } while (true);
+ }
+ // Sort using quick sort
+ if (tPointers.length > 0) {
+ sort(tPointers, 0, totalTCount);
+ }
+ }
+
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ // Get table index
+ int mTable = tPointers[m * 3];
+ int mRow = tPointers[m * 3 + 1];
+ int mNormKey = tPointers[m * 3 + 2];
+ // Get frame and tuple index
+ table.getTuplePointer(mTable, mRow, storedTuplePointer);
+ int mFrame = storedTuplePointer.frameIndex;
+ int mTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(mFrame));
+
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int bTable = tPointers[b * 3];
+ int bRow = tPointers[b * 3 + 1];
+ int bNormKey = tPointers[b * 3 + 2];
+ int cmp = 0;
+ if (bNormKey != mNormKey) {
+ cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ table.getTuplePointer(bTable, bRow, storedTuplePointer);
+ int bFrame = storedTuplePointer.frameIndex;
+ int bTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(bFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cTable = tPointers[c * 3];
+ int cRow = tPointers[c * 3 + 1];
+ int cNormKey = tPointers[c * 3 + 2];
+ int cmp = 0;
+ if (cNormKey != mNormKey) {
+ cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ table.getTuplePointer(cTable, cRow, storedTuplePointer);
+ int cFrame = storedTuplePointer.frameIndex;
+ int cTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(cFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
+
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
+
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
+
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 3; ++i) {
+ int t = x[a * 3 + i];
+ x[a * 3 + i] = x[b * 3 + i];
+ x[b * 3 + i] = t;
+ }
+ }
+
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
+
+ @Override
+ public void close() {
+ groupSize = 0;
+ dataFrameCount = -1;
+ tPointers = null;
+ table.close();
+ frames.clear();
+ }
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
index 3fc7d79..8eff82f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
@@ -24,6 +24,4 @@
IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor) throws HyracksDataException;
- ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
- RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
deleted file mode 100644
index 59c69eb..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * An spillable version of the {@link IAccumulatingAggregator} supporting
- * external aggregation.
- */
-public interface ISpillableAccumulatingAggregator extends IAccumulatingAggregator {
-
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException;
-
- /**
- * @param accessor
- * @param tIndex
- * @throws HyracksDataException
- */
- public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException;
-
- public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
new file mode 100644
index 0000000..7bd8cd8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * @author jarodwen
+ */
+public interface ISpillableTable {
+
+ public void close();
+
+ public void reset();
+
+ public int getFrameCount();
+
+ public List<ByteBuffer> getFrames();
+
+ public void sortFrames();
+
+ public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
new file mode 100644
index 0000000..07f0ac0
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+
+/**
+ * @author jarodwen
+ */
+public interface ISpillableTableFactory extends Serializable {
+ ISpillableTable buildSpillableTable(IHyracksStageletContext ctx, int[] keyFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
deleted file mode 100644
index b3b9f24..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-/**
- * An in-mem hash table for spillable grouping operations.
- * A table of {@link #Link}s are maintained in this object, and each row
- * of this table represents a hash partition.
- */
-public class SpillableGroupingHashTable {
-
- /**
- * Context.
- */
- private final IHyracksStageletContext ctx;
-
- /**
- * Columns for group-by
- */
- private final int[] fields;
-
- /**
- * Key fields of records in the hash table (starting from 0
- * to the number of the key fields).
- * This is different from the key fields in the input records,
- * since these fields are extracted when being inserted into
- * the hash table.
- */
- private final int[] storedKeys;
-
- /**
- * Comparators: one for each column in {@link #groupFields}
- */
- private final IBinaryComparator[] comparators;
-
- /**
- * Record descriptor for the input tuple.
- */
- private final RecordDescriptor inRecordDescriptor;
-
- /**
- * Record descriptor for the partial aggregation result.
- */
- private final RecordDescriptor outputRecordDescriptor;
-
- /**
- * Accumulators in the main memory.
- */
- private ISpillableAccumulatingAggregator[] accumulators;
-
- /**
- * The hashing group table containing pointers to aggregators and also the
- * corresponding key tuples. So for each entry, there will be three integer
- * fields:
- * 1. The frame index containing the key tuple; 2. The tuple index inside of
- * the frame for the key tuple; 3. The index of the aggregator.
- * Note that each link in the table is a partition for the input records. Multiple
- * records in the same partition based on the {@link #tpc} are stored as
- * pointers.
- */
- private final Link[] table;
-
- /**
- * Number of accumulators.
- */
- private int accumulatorSize = 0;
-
- /**
- * Factory for the aggregators.
- */
- private final IAccumulatingAggregatorFactory aggregatorFactory;
-
- private final List<ByteBuffer> frames;
-
- private final ByteBuffer outFrame;
-
- /**
- * Frame appender for output frames in {@link #frames}.
- */
- private final FrameTupleAppender appender;
-
- /**
- * The count of used frames in the table.
- * Note that this cannot be replaced by {@link #frames} since frames will
- * not be removed after being created.
- */
- private int dataFrameCount;
-
- /**
- * Pointers for the sorted aggregators
- */
- private int[] tPointers;
-
- private static final int INIT_ACCUMULATORS_SIZE = 8;
-
- /**
- * The maximum number of frames available for this hashing group table.
- */
- private final int framesLimit;
-
- private final FrameTuplePairComparator ftpc;
-
- /**
- * A partition computer to partition the hashing group table.
- */
- private final ITuplePartitionComputer tpc;
-
- /**
- * Accessors for the tuples. Two accessors are necessary during the sort.
- */
- private final FrameTupleAccessor storedKeysAccessor1;
- private final FrameTupleAccessor storedKeysAccessor2;
-
- /**
- * Create a spillable grouping hash table.
- *
- * @param ctx
- * The context of the job.
- * @param fields
- * Fields of keys for grouping.
- * @param comparatorFactories
- * The comparators.
- * @param tpcf
- * The partitioners. These are used to partition the incoming records into proper partition of the hash table.
- * @param aggregatorFactory
- * The aggregators.
- * @param inRecordDescriptor
- * Record descriptor for input data.
- * @param outputRecordDescriptor
- * Record descriptor for output data.
- * @param framesLimit
- * The maximum number of frames usable in the memory for hash table.
- * @param tableSize
- * The size of the table, which specified the number of partitions of the table.
- */
- public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
- IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory tpcf,
- IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outputRecordDescriptor, int framesLimit, int tableSize) {
- this.ctx = ctx;
- this.fields = fields;
-
- storedKeys = new int[fields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-
- // Note that after storing a record into the hash table, the index for the fields should
- // be updated. Here we assume that all these key fields are written at the beginning of
- // the record, so their index should start from 0 and end at the length of the key fields.
- for (int i = 0; i < fields.length; ++i) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
- }
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
- storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
- storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
-
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- this.table = new Link[tableSize];
-
- this.aggregatorFactory = aggregatorFactory;
- accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
-
- this.framesLimit = framesLimit;
-
- // Tuple pair comparator
- ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-
- // Partitioner
- tpc = tpcf.createPartitioner();
-
- this.inRecordDescriptor = inRecordDescriptor;
- this.outputRecordDescriptor = outputRecordDescriptor;
- frames = new ArrayList<ByteBuffer>();
- appender = new FrameTupleAppender(ctx.getFrameSize());
-
- dataFrameCount = -1;
-
- outFrame = ctx.allocateFrame();
- }
-
- public void reset() {
- dataFrameCount = -1;
- tPointers = null;
- // Reset the grouping hash table
- for (int i = 0; i < table.length; i++) {
- table[i] = new Link();
- }
- }
-
- public int getFrameCount() {
- return dataFrameCount;
- }
-
- /**
- * How to define pointers for the partial aggregation
- *
- * @return
- */
- public int[] getTPointers() {
- return tPointers;
- }
-
- /**
- * Redefine the number of fields in the pointer.
- * Only two pointers are necessary for external grouping: one is to the
- * index of the hash table, and the other is to the row index inside of the
- * hash table.
- *
- * @return
- */
- public int getPtrFields() {
- return 2;
- }
-
- public List<ByteBuffer> getFrames() {
- return frames;
- }
-
- /**
- * Set the working frame to the next available frame in the
- * frame list. There are two cases:<br>
- * 1) If the next frame is not initialized, allocate
- * a new frame.
- * 2) When frames are already created, they are recycled.
- *
- * @return Whether a new frame is added successfully.
- */
- private boolean nextAvailableFrame() {
- // Return false if the number of frames is equal to the limit.
- if (dataFrameCount + 1 >= framesLimit)
- return false;
-
- if (frames.size() < framesLimit) {
- // Insert a new frame
- ByteBuffer frame = ctx.allocateFrame();
- frame.position(0);
- frame.limit(frame.capacity());
- frames.add(frame);
- appender.reset(frame, true);
- dataFrameCount++;
- } else {
- // Reuse an old frame
- dataFrameCount++;
- ByteBuffer frame = frames.get(dataFrameCount);
- frame.position(0);
- frame.limit(frame.capacity());
- appender.reset(frame, true);
- }
- return true;
- }
-
- /**
- * Insert a new record from the input frame.
- *
- * @param accessor
- * @param tIndex
- * @return
- * @throws HyracksDataException
- */
- public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- if (dataFrameCount < 0)
- nextAvailableFrame();
- // Get the partition for the inserting tuple
- int entry = tpc.partition(accessor, tIndex, table.length);
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- // Find the corresponding aggregator from existing aggregators
- ISpillableAccumulatingAggregator aggregator = null;
- for (int i = 0; i < link.size; i += 3) {
- int sbIndex = link.pointers[i];
- int stIndex = link.pointers[i + 1];
- int saIndex = link.pointers[i + 2];
- storedKeysAccessor1.reset(frames.get(sbIndex));
- int c = ftpc.compare(accessor, tIndex, storedKeysAccessor1, stIndex);
- if (c == 0) {
- aggregator = accumulators[saIndex];
- break;
- }
- }
- // Do insert
- if (aggregator == null) {
- // Did not find the aggregator. Insert a new aggregator entry
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- if (!nextAvailableFrame()) {
- // If buffer is full, return false to trigger a run file
- // write
- return false;
- } else {
- // Try to do insert after adding a new frame.
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- throw new IllegalStateException();
- }
- }
- }
- int sbIndex = dataFrameCount;
- int stIndex = appender.getTupleCount() - 1;
- if (accumulatorSize >= accumulators.length) {
- accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
- }
- int saIndex = accumulatorSize++;
- aggregator = accumulators[saIndex] = aggregatorFactory.createSpillableAggregator(ctx, inRecordDescriptor,
- outputRecordDescriptor);
- aggregator.init(accessor, tIndex);
- link.add(sbIndex, stIndex, saIndex);
- }
- aggregator.accumulate(accessor, tIndex);
- return true;
- }
-
- /**
- * Sort partial results
- */
- public void sortFrames() {
- int totalTCount = 0;
- // Get the number of records
- for (int i = 0; i < table.length; i++) {
- if (table[i] == null)
- continue;
- totalTCount += table[i].size / 3;
- }
- // Start sorting:
- /*
- * Based on the data structure for the partial aggregates, the
- * pointers should be initialized.
- */
- tPointers = new int[totalTCount * getPtrFields()];
- // Initialize pointers
- int ptr = 0;
- // Maintain two pointers to each entry of the hashing group table
- for (int i = 0; i < table.length; i++) {
- if (table[i] == null)
- continue;
- for (int j = 0; j < table[i].size; j = j + 3) {
- tPointers[ptr * getPtrFields()] = i;
- tPointers[ptr * getPtrFields() + 1] = j;
- ptr++;
- }
- }
- // Sort using quick sort
- if (tPointers.length > 0) {
- sort(tPointers, 0, totalTCount);
- }
- }
-
- /**
- * @param writer
- * @throws HyracksDataException
- */
- public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-
- ISpillableAccumulatingAggregator aggregator = null;
- writer.open();
- appender.reset(outFrame, true);
- if (sorted) {
- sortFrames();
- }
- if (tPointers == null) {
- // Not sorted
- for (int i = 0; i < table.length; ++i) {
- Link link = table[i];
- if (link != null) {
- for (int j = 0; j < link.size; j += 3) {
- int bIndex = link.pointers[j];
- int tIndex = link.pointers[j + 1];
- int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = frames.get(bIndex);
- storedKeysAccessor1.reset(keyBuffer);
- aggregator = accumulators[aIndex];
- while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- }
- }
- }
- }
- if (appender.getTupleCount() != 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- return;
- }
- int n = tPointers.length / getPtrFields();
- for (int ptr = 0; ptr < n; ptr++) {
- int tableIndex = tPointers[ptr * 2];
- int rowIndex = tPointers[ptr * 2 + 1];
- int frameIndex = table[tableIndex].pointers[rowIndex];
- int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
- int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
- // Get the frame containing the value
- ByteBuffer buffer = frames.get(frameIndex);
- storedKeysAccessor1.reset(buffer);
-
- // Get the aggregator
- aggregator = accumulators[aggregatorIndex];
- // Insert
- if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
- throw new IllegalStateException();
- } else {
- accumulators[aggregatorIndex] = null;
- }
- } else {
- accumulators[aggregatorIndex] = null;
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
-
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- // Get table index
- int mTable = tPointers[m * 2];
- int mRow = tPointers[m * 2 + 1];
- // Get frame and tuple index
- int mFrame = table[mTable].pointers[mRow];
- int mTuple = table[mTable].pointers[mRow + 1];
- storedKeysAccessor1.reset(frames.get(mFrame));
-
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int bTable = tPointers[b * 2];
- int bRow = tPointers[b * 2 + 1];
- int bFrame = table[bTable].pointers[bRow];
- int bTuple = table[bTable].pointers[bRow + 1];
- storedKeysAccessor2.reset(frames.get(bFrame));
- int cmp = ftpc.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
- // int cmp = compare(tPointers, b, mi, mj, mv);
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cTable = tPointers[c * 2];
- int cRow = tPointers[c * 2 + 1];
- int cFrame = table[cTable].pointers[cRow];
- int cTuple = table[cTable].pointers[cRow + 1];
- storedKeysAccessor2.reset(frames.get(cFrame));
- int cmp = ftpc.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
- // int cmp = compare(tPointers, c, mi, mj, mv);
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(tPointers, b++, c--);
- }
-
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
-
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
- }
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
- }
- }
-
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 2; ++i) {
- int t = x[a * 2 + i];
- x[a * 2 + i] = x[b * 2 + i];
- x[b * 2 + i] = t;
- }
- }
-
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
-
- /**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
- * @author vinayakb
- */
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 9;
-
- int[] pointers;
- int size;
-
- Link() {
- pointers = new int[INIT_POINTERS_SIZE];
- size = 0;
- }
-
- void add(int bufferIdx, int tIndex, int accumulatorIdx) {
- while (size + 3 > pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
- }
- pointers[size++] = bufferIdx;
- pointers[size++] = tIndex;
- pointers[size++] = accumulatorIdx;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("[Size=" + size + "]");
- for (int i = 0; i < pointers.length; i = i + 3) {
- sb.append(pointers[i] + ",");
- sb.append(pointers[i + 1] + ",");
- sb.append(pointers[i + 2] + "; ");
- }
- return sb.toString();
- }
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 94a9501..d1d528c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -42,7 +42,7 @@
private final ByteBuffer outBuffer;
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuild;
-
+
public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
@@ -57,7 +57,7 @@
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
- this.isLeftOuter = isLeftOuter;
+ this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
int fieldCountOuter = accessor1.getFieldCount();
nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
@@ -114,13 +114,14 @@
}
}
if (!matchFound && isLeftOuter) {
- if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+ nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
- .getSize())) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+ nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
throw new IllegalStateException();
- }
+ }
}
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 7f097e0..e44e8a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -44,7 +45,7 @@
@Override
public void writeData(Object[] data) throws HyracksDataException {
for (int i = 0; i < data.length; ++i) {
- System.err.print(String.valueOf(data[i]));
+ System.err.print(StringSerializationUtils.toString(data[i]));
System.err.print(", ");
}
System.err.println();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 3742e91..710ef38 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -35,14 +35,18 @@
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor fta1;
private final FrameTupleAccessor fta2;
+ private final FrameTupleAppender appender;
+
+ private final ByteBuffer outFrame;
+
private int dataFrameCount;
private int[] tPointers;
+ private int tupleCount;
public FrameSorter(IHyracksCommonContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
@@ -54,39 +58,24 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- this.recordDescriptor = recordDescriptor;
buffers = new ArrayList<ByteBuffer>();
fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrame = ctx.allocateFrame();
dataFrameCount = 0;
}
public void reset() {
dataFrameCount = 0;
- tPointers = null;
+ tupleCount = 0;
}
public int getFrameCount() {
return dataFrameCount;
}
- /**
- * Gets the sorted tuple pointers.
- * A tuple is "pointed" to by 4 entries in the tPointers array.
- * [0] = Frame index in the "Frames" list.
- * [1] = Start offset of the tuple in the frame
- * [2] = Length of the tuple
- * [3] = Poor man's normalized key for the tuple.
- */
- public int[] getTPointers() {
- return tPointers;
- }
-
- public List<ByteBuffer> getFrames() {
- return buffers;
- }
-
public void insertFrame(ByteBuffer buffer) {
ByteBuffer copyFrame;
if (dataFrameCount == buffers.size()) {
@@ -100,55 +89,50 @@
}
public void sortFrames() {
- FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
int nBuffers = dataFrameCount;
- int totalTCount = 0;
+ tupleCount = 0;
for (int i = 0; i < nBuffers; ++i) {
- accessor.reset(buffers.get(i));
- totalTCount += accessor.getTupleCount();
+ fta1.reset(buffers.get(i));
+ tupleCount += fta1.getTupleCount();
}
int sfIdx = sortFields[0];
- tPointers = new int[totalTCount * 4];
+ tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
int ptr = 0;
for (int i = 0; i < nBuffers; ++i) {
- accessor.reset(buffers.get(i));
- int tCount = accessor.getTupleCount();
- byte[] array = accessor.getBuffer().array();
+ fta1.reset(buffers.get(i));
+ int tCount = fta1.getTupleCount();
+ byte[] array = fta1.getBuffer().array();
for (int j = 0; j < tCount; ++j) {
- int tStart = accessor.getTupleStartOffset(j);
- int tEnd = accessor.getTupleEndOffset(j);
+ int tStart = fta1.getTupleStartOffset(j);
+ int tEnd = fta1.getTupleEndOffset(j);
tPointers[ptr * 4] = i;
tPointers[ptr * 4 + 1] = tStart;
tPointers[ptr * 4 + 2] = tEnd;
- int f0StartRel = accessor.getFieldStartOffset(j, sfIdx);
- int f0EndRel = accessor.getFieldEndOffset(j, sfIdx);
- int f0Start = f0StartRel + tStart + accessor.getFieldSlotsLength();
+ int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
+ int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
+ int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
++ptr;
}
}
- if (tPointers.length > 0) {
- sort(tPointers, 0, totalTCount);
+ if (tupleCount > 0) {
+ sort(tPointers, 0, tupleCount);
}
}
public void flushFrames(IFrameWriter writer) throws HyracksDataException {
- FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- ByteBuffer outFrame = ctx.allocateFrame();
writer.open();
appender.reset(outFrame, true);
- int n = tPointers.length / 4;
- for (int ptr = 0; ptr < n; ++ptr) {
+ for (int ptr = 0; ptr < tupleCount; ++ptr) {
int i = tPointers[ptr * 4];
int tStart = tPointers[ptr * 4 + 1];
int tEnd = tPointers[ptr * 4 + 2];
ByteBuffer buffer = buffers.get(i);
- accessor.reset(buffer);
- if (!appender.append(accessor, tStart, tEnd)) {
+ fta1.reset(buffer);
+ if (!appender.append(fta1, tStart, tEnd)) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
- if (!appender.append(accessor, tStart, tEnd)) {
+ if (!appender.append(fta1, tStart, tEnd)) {
throw new IllegalStateException();
}
}
@@ -255,4 +239,4 @@
}
return 0;
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java
new file mode 100644
index 0000000..7dc0b17
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface ISerializableTable {
+
+ public void insert(int entry, TuplePointer tuplePointer);
+
+ public void getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
+
+ public int getFrameCount();
+
+ public int getTupleCount();
+
+ public void reset();
+
+ public void close();
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
new file mode 100644
index 0000000..9e8cf00
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -0,0 +1,267 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+
+/**
+ * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
+ * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
+ */
+public class SerializableHashTable implements ISerializableTable {
+
+ private static final int INT_SIZE = 4;
+ private static final int INIT_ENTRY_SIZE = 4;
+
+ private IntSerDeBuffer[] headers;
+ private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
+ private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
+ private final IHyracksStageletContext ctx;
+ private int frameCapacity = 0;
+ private int currentLargestFrameIndex = 0;
+ private int tupleCount = 0;
+ private int headerFrameCount = 0;
+ private TuplePointer tempTuplePointer = new TuplePointer();
+
+ public SerializableHashTable(int tableSize, final IHyracksStageletContext ctx) {
+ this.ctx = ctx;
+ int frameSize = ctx.getFrameSize();
+
+ int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+ int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
+ headers = new IntSerDeBuffer[headerSize];
+
+ IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
+ contents.add(frame);
+ frameCurrentIndex.add(0);
+ frameCapacity = frame.capacity();
+ }
+
+ @Override
+ public void insert(int entry, TuplePointer pointer) {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header == null) {
+ header = new IntSerDeBuffer(ctx.allocateFrame().array());
+ headers[hFrameIndex] = header;
+ resetFrame(header);
+ headerFrameCount++;
+ }
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ if (frameIndex < 0) {
+ // insert first tuple into the entry
+ insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+ } else {
+ // insert non-first tuple into the entry
+ insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
+ }
+ tupleCount++;
+ }
+
+ @Override
+ public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header == null) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ if (frameIndex < 0) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+ if (offset > entryUsedItems - 1) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ int startIndex = offsetIndex + 2 + offset * 2;
+ while (startIndex >= frameCapacity) {
+ ++frameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(frameIndex);
+ dataPointer.frameIndex = frame.getInt(startIndex);
+ dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+ }
+
+ @Override
+ public void reset() {
+ for (IntSerDeBuffer frame : headers)
+ if (frame != null)
+ resetFrame(frame);
+
+ frameCurrentIndex.clear();
+ for (int i = 0; i < contents.size(); i++) {
+ frameCurrentIndex.add(0);
+ }
+
+ currentLargestFrameIndex = 0;
+ tupleCount = 0;
+ }
+
+ @Override
+ public int getFrameCount() {
+ return headerFrameCount + contents.size();
+ }
+
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ @Override
+ public void close() {
+ for (int i = 0; i < headers.length; i++)
+ headers[i] = null;
+ contents.clear();
+ frameCurrentIndex.clear();
+ tupleCount = 0;
+ currentLargestFrameIndex = 0;
+ }
+
+ private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) {
+ IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
+ int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+ int requiredIntCapacity = entryCapacity * 2;
+ int startFrameIndex = currentLargestFrameIndex;
+
+ if (lastIndex + requiredIntCapacity >= frameCapacity) {
+ IntSerDeBuffer newFrame;
+ startFrameIndex++;
+ do {
+ if (currentLargestFrameIndex >= contents.size() - 1) {
+ newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
+ currentLargestFrameIndex++;
+ contents.add(newFrame);
+ frameCurrentIndex.add(0);
+ } else {
+ currentLargestFrameIndex++;
+ frameCurrentIndex.set(currentLargestFrameIndex, 0);
+ }
+ requiredIntCapacity -= frameCapacity;
+ } while (requiredIntCapacity > 0);
+ lastIndex = 0;
+ lastFrame = contents.get(startFrameIndex);
+ }
+
+ // set header
+ header.writeInt(headerOffset, startFrameIndex);
+ header.writeInt(headerOffset + 1, lastIndex);
+
+ // set the entry
+ lastFrame.writeInt(lastIndex, entryCapacity - 1);
+ lastFrame.writeInt(lastIndex + 1, 1);
+ lastFrame.writeInt(lastIndex + 2, pointer.frameIndex);
+ lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex);
+ int newLastIndex = lastIndex + entryCapacity * 2;
+ newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
+ frameCurrentIndex.set(startFrameIndex, newLastIndex);
+
+ requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+ while (requiredIntCapacity > 0) {
+ startFrameIndex++;
+ requiredIntCapacity -= frameCapacity;
+ newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
+ frameCurrentIndex.set(startFrameIndex, newLastIndex);
+ }
+ }
+
+ private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex,
+ TuplePointer pointer) {
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryItems = frame.getInt(offsetIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+
+ if (entryUsedItems < entryItems) {
+ frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
+ int startIndex = offsetIndex + 2 + entryUsedItems * 2;
+ while (startIndex >= frameCapacity) {
+ ++frameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(frameIndex);
+ frame.writeInt(startIndex, pointer.frameIndex);
+ frame.writeInt(startIndex + 1, pointer.tupleIndex);
+ } else {
+ int capacity = (entryItems + 1) * 2;
+ header.writeInt(headerOffset, -1);
+ header.writeInt(headerOffset + 1, -1);
+ int fIndex = frame.getInt(offsetIndex + 2);
+ int tIndex = frame.getInt(offsetIndex + 3);
+ tempTuplePointer.frameIndex = fIndex;
+ tempTuplePointer.tupleIndex = tIndex;
+ this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer);
+ int newFrameIndex = header.getInt(headerOffset);
+ int newTupleIndex = header.getInt(headerOffset + 1);
+
+ for (int i = 1; i < entryUsedItems; i++) {
+ int startIndex = offsetIndex + 2 + i * 2;
+ int startFrameIndex = frameIndex;
+ while (startIndex >= frameCapacity) {
+ ++startFrameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(startFrameIndex);
+ fIndex = frame.getInt(startIndex);
+ tIndex = frame.getInt(startIndex + 1);
+ tempTuplePointer.frameIndex = fIndex;
+ tempTuplePointer.tupleIndex = tIndex;
+ insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer);
+ }
+ insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer);
+ }
+ }
+
+ private void resetFrame(IntSerDeBuffer frame) {
+ for (int i = 0; i < frameCapacity; i++)
+ frame.writeInt(i, -1);
+ }
+
+ private int getHeaderFrameIndex(int entry) {
+ int frameIndex = entry * 2 / frameCapacity;
+ return frameIndex;
+ }
+
+ private int getHeaderFrameOffset(int entry) {
+ int offset = entry * 2 % frameCapacity;
+ return offset;
+ }
+
+}
+
+class IntSerDeBuffer {
+
+ private byte[] bytes;
+
+ public IntSerDeBuffer(byte[] data) {
+ this.bytes = data;
+ }
+
+ public int getInt(int pos) {
+ int offset = pos * 4;
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ + ((bytes[offset + 3] & 0xff) << 0);
+ }
+
+ public void writeInt(int pos, int value) {
+ int offset = pos * 4;
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public int capacity() {
+ return bytes.length / 4;
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
new file mode 100644
index 0000000..6618fb1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public class TuplePointer {
+ public int frameIndex;
+ public int tupleIndex;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java
new file mode 100644
index 0000000..1126c1f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.util;
+
+import java.util.Arrays;
+
+public class StringSerializationUtils {
+ public static String toString(Object object) {
+ if (object instanceof Object[]) {
+ return Arrays.deepToString((Object[]) object);
+ } else {
+ return String.valueOf(object);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 694f183..b107282 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -26,23 +26,24 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MinMaxAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.SumAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.AvgAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.ConcatAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -50,74 +51,107 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
/**
- * Test cases for external hash group operator.
+ * @author jarodwen
*/
public class ExternalAggregateTest extends AbstractIntegrationTest {
- /**
- * Test 01: aggregate (count) on single field, on a simple data set.
- *
- * @throws Exception
- */
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+ new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+ static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
+
+ static final String outSplits1 = "nc1:" + outSplitsPrefix + "/aggregation_";
+ static final String outSplits2 = "nc2:" + outSplitsPrefix + "/aggregation_";
+
+ static final boolean isOutputFile = true;
+
+ final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
+
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+
+ private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
+ String prefix) {
+ AbstractSingleActivityOperatorDescriptor printer;
+
+ if (!isOutputFile)
+ printer = new PrinterOperatorDescriptor(spec);
+ else
+ printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
+ parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+
+ return printer;
+ }
+
@Test
- public void externalAggregateTestSingleFieldSimpleData() throws Exception {
+ public void hashSingleKeyScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
- new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- // Input format: a string field as the key
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- // Output format: a string field as the key, and an integer field as the
- // count
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- // Data set format: word(string),count(int)
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec,
- splitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
- desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
-
- int[] keys = new int[] { 0 };
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 3;
int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
- spec, // Job conf
- keys, // Group key
- 3, // Number of frames
- false, // Whether to sort the output
- // Hash partitioner
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- // Key comparator
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- // Aggregator factory
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outputRec, // Output format
- tableSize // Size of the hashing table, which is used to control
- // the partition when hashing
- );
+ new UTF8StringNormalizedKeyComputerFactory(), new CountAggregatorDescriptorFactory(),
+ new IntSumAggregatorDescriptorFactory(keyFields.length), outputRec,
+ new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
+ true);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashSingleKeyScalarGroupTest");
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -127,186 +161,41 @@
runTest(spec);
}
- /**
- * Test 02: Control experiment using in-memory aggregator, on the same data
- * set of {@link #externalAggregateTest01()}
- *
- * @throws Exception
- */
@Test
- public void externalAggregateTestSingleFieldSimpleDataInMemControl() throws Exception {
+ public void hashMultipleKeyScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
- new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
-
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec,
- splitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
- int[] keys = new int[] { 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- /**
- * Test 03: aggregates on multiple fields
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestMultiAggFields() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
-
- int[] keys = new int[] { 0 };
- int tableSize = 8;
-
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
- new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
-
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- /**
- * Test 05: aggregate on multiple key fields
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestMultiKeys() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ IntegerSerializerDeserializer.INSTANCE, });
- // Group on two fields
- int[] keys = new int[] { 0, 1 };
+ int[] keyFields = new int[] { 0, 9 };
+ int frameLimits = 3;
int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
- new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
- tableSize);
+ UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+ new IntSumAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(keyFields.length),
+ outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -315,66 +204,44 @@
runTest(spec);
}
- /**
- * Test 06: tests on non-string key field
- *
- * @throws Exception
- */
@Test
- public void externalAggregateTestNonStringKey() throws Exception {
+ public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, });
- // Group on two fields
- int[] keys = new int[] { 0, 1 };
+ int[] keyFields = new int[] { 0, 9 };
+ int frameLimits = 3;
int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3000, true,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
- new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
- tableSize);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3) }),
+ new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(2, 2), new IntSumAggregatorDescriptorFactory(3, 3) }),
+ outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyMultipleScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -382,4 +249,138 @@
spec.addRoot(printer);
runTest(spec);
}
-}
\ No newline at end of file
+
+ @Test
+ public void hashMultipleKeyNonScalarGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 3;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(), new ConcatAggregatorDescriptorFactory(9),
+ new ConcatAggregatorDescriptorFactory(keyFields.length), outputRec,
+ new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
+ true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyNonScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0, 9 };
+ int frameLimits = 3;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3),
+ new ConcatAggregatorDescriptorFactory(9, 4) }), new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(2, 2),
+ new IntSumAggregatorDescriptorFactory(3, 3),
+ new ConcatAggregatorDescriptorFactory(4, 4) }), outputRec,
+ new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashMultipleKeyMultipleFieldsGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void hashSingleKeyScalarAvgGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 3;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(), new AvgAggregatorDescriptorFactory(1),
+ new AvgAggregatorDescriptorFactory(keyFields.length), outputRec, new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+ "hashSingleKeyScalarGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+}
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 8d22bca..b02b0ee 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -36,13 +36,18 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -55,8 +60,9 @@
import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -112,7 +118,7 @@
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
- options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 2, options.outPlain);
+ options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 3, options.outPlain);
System.out.print(i + "\t" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
@@ -195,14 +201,22 @@
switch (alg) {
case 0: // External hash group
- grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keys,
+ framesLimit,
+ new IBinaryComparatorFactory[] {
// IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
- htSize);
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
+ outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
createPartitionConstraint(spec, grouper, outSplits);
@@ -258,23 +272,31 @@
spec.connect(scanConn, fileScanner, 0, grouper, 0);
break;
default:
- grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keys,
+ framesLimit,
+ new IBinaryComparatorFactory[] {
// IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
- htSize);
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
+ outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
createPartitionConstraint(spec, grouper, outSplits);
// Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(spec,
+ IConnectorDescriptor defaultGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
// IntegerBinaryHashFunctionFactory.INSTANCE,
IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+ spec.connect(defaultGroupConn, fileScanner, 0, grouper, 0);
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);