Update issue #52:
Added new aggregator interfaces and group operator implementations, in order to merge the old interfaces.
A simple integer sum aggregator is added to show the usage of the new interfaces.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@860 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
new file mode 100644
index 0000000..b73cf46
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+/**
+ *
+ */
+public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+ private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+ private static final int MERGE_ACTIVITY_ID = 1;
+
+ private static final long serialVersionUID = 1L;
+ private final int[] keyFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+ private final IFieldAggregateDescriptorFactory[] aggregateFactories;
+ private final IFieldAggregateDescriptorFactory[] mergeFactories;
+ private final int framesLimit;
+ private final ISpillableTableFactory spillableTableFactory;
+ private final boolean isOutputSorted;
+
+ public ExternalGroupOperatorDescriptor(JobSpecification spec,
+ int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory firstNormalizerFactory,
+ IFieldAggregateDescriptorFactory[] aggregateFactories,
+ IFieldAggregateDescriptorFactory[] mergeFactories,
+ RecordDescriptor recordDescriptor,
+ ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ if (framesLimit <= 1) {
+ /**
+ * Minimum of 2 frames: 1 for input records, and 1 for output
+ * aggregation results.
+ */
+ throw new IllegalStateException(
+ "frame limit should at least be 2, but it is "
+ + framesLimit + "!");
+ }
+ this.aggregateFactories = aggregateFactories;
+ this.mergeFactories = mergeFactories;
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+ this.spillableTableFactory = spillableTableFactory;
+ this.isOutputSorted = isOutputSorted;
+
+ /**
+ * Set the record descriptor. Note that since this operator is a unary
+ * operator, only the first record descriptor is used here.
+ */
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+ * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+ */
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
+ getOperatorId(), AGGREGATE_ACTIVITY_ID));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
+ MERGE_ACTIVITY_ID));
+
+ builder.addActivity(aggregateAct);
+ builder.addSourceEdge(0, aggregateAct, 0);
+
+ builder.addActivity(mergeAct);
+ builder.addTargetEdge(0, mergeAct, 0);
+
+ builder.addBlockingEdge(aggregateAct, mergeAct);
+ }
+
+ public static class AggregateActivityState extends AbstractTaskState {
+ private LinkedList<RunFileReader> runs;
+
+ private ISpillableTable gTable;
+
+ public AggregateActivityState() {
+ }
+
+ private AggregateActivityState(JobId jobId, TaskId tId) {
+ super(jobId, tId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class AggregateActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public AggregateActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions)
+ throws HyracksDataException {
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private AggregateActivityState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new AggregateActivityState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
+ state.runs = new LinkedList<RunFileReader>();
+ state.gTable = spillableTableFactory.buildSpillableTable(
+ ctx, keyFields, comparatorFactories,
+ firstNormalizerFactory, aggregateFactories,
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0],
+ ExternalGroupOperatorDescriptor.this.framesLimit);
+ state.gTable.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ /**
+ * If the group table is too large, flush the table into
+ * a run file.
+ */
+ if (!state.gTable.insert(accessor, i)) {
+ flushFramesToRun();
+ if (!state.gTable.insert(accessor, i))
+ throw new HyracksDataException(
+ "Failed to insert a new buffer into the aggregate operator!");
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (state.gTable.getFrameCount() >= 0) {
+ if (state.runs.size() > 0) {
+ /**
+ * flush the memory into the run file.
+ */
+ flushFramesToRun();
+ state.gTable.close();
+ state.gTable = null;
+ }
+ }
+ ctx.setTaskState(state);
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext()
+ .createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class
+ .getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile,
+ ctx.getIOManager());
+ writer.open();
+ try {
+ state.gTable.sortFrames();
+ state.gTable.flushFrames(writer, true);
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ writer.close();
+ }
+ state.gTable.reset();
+ state.runs.add(((RunFileWriter) writer).createReader());
+ }
+ };
+ return op;
+ }
+ }
+
+ private class MergeActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MergeActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions)
+ throws HyracksDataException {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i]
+ .createBinaryComparator();
+ }
+ final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
+ final IAggregateState[] aggregateStates = new IAggregateState[mergeFactories.length];
+ for (int i = 0; i < currentWorkingAggregators.length; i++) {
+ currentWorkingAggregators[i] = mergeFactories[i]
+ .createAggregator(ctx, recordDescriptors[0],
+ recordDescriptors[0]);
+ aggregateStates[i] = currentWorkingAggregators[i].createState();
+ }
+ final int[] storedKeys = new int[keyFields.length];
+ /**
+ * Get the list of the fields in the stored records.
+ */
+ for (int i = 0; i < keyFields.length; ++i) {
+ storedKeys[i] = i;
+ }
+ /**
+ * Tuple builder
+ */
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+ recordDescriptors[0].getFields().length);
+
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ /**
+ * Input frames, one for each run file.
+ */
+ private List<ByteBuffer> inFrames;
+
+ /**
+ * Output frame.
+ */
+ private ByteBuffer outFrame, writerFrame;
+
+ private LinkedList<RunFileReader> runs;
+
+ private AggregateActivityState aggState;
+
+ /**
+ * how many frames to be read ahead once
+ */
+ private int runFrameLimit = 1;
+
+ private int[] currentFrameIndexInRun;
+ private int[] currentRunFrames;
+ private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), recordDescriptors[0]);
+ private ArrayTupleBuilder finalTupleBuilder;
+ private FrameTupleAppender writerFrameAppender;
+
+ public void initialize() throws HyracksDataException {
+ aggState = (AggregateActivityState) ctx
+ .getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), AGGREGATE_ACTIVITY_ID),
+ partition));
+ runs = aggState.runs;
+ writer.open();
+ try {
+ if (runs.size() <= 0) {
+ ISpillableTable gTable = aggState.gTable;
+ if (gTable != null) {
+ if (isOutputSorted)
+ gTable.sortFrames();
+ gTable.flushFrames(writer, false);
+ }
+ } else {
+ runs = new LinkedList<RunFileReader>(runs);
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outFrameAppender.reset(outFrame, true);
+ outFrameAccessor.reset(outFrame);
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ inFrames.clear();
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ private void doPass(LinkedList<RunFileReader> runs)
+ throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+
+ while (inFrames.size() + 2 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int runNumber;
+ if (runs.size() + 2 <= framesLimit) {
+ finalPass = true;
+ runFrameLimit = (framesLimit - 2) / runs.size();
+ runNumber = runs.size();
+ } else {
+ runNumber = framesLimit - 2;
+ newRun = ctx.getJobletContext()
+ .createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class
+ .getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ currentFrameIndexInRun = new int[runNumber];
+ currentRunFrames = new int[runNumber];
+ /**
+ * Create file readers for each input run file, only for
+ * the ones fit into the inFrames
+ */
+ RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
+ .size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
+ ctx.getFrameSize(), recordDescriptors[0],
+ runNumber, comparator);
+ /**
+ * current tuple index in each run
+ */
+ int[] tupleIndices = new int[runNumber];
+
+ for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
+ tupleIndices[runIndex] = 0;
+ // Load the run file
+ runFileReaders[runIndex] = runs.get(runIndex);
+ runFileReaders[runIndex].open();
+
+ currentRunFrames[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runIndex
+ * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex]
+ + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames
+ .get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(
+ ctx.getFrameSize(),
+ recordDescriptors[0]);
+ tupleAccessors[frameIndex].reset(inFrames
+ .get(frameIndex));
+ currentRunFrames[runIndex]++;
+ if (j == 0)
+ setNextTopTuple(runIndex, tupleIndices,
+ runFileReaders, tupleAccessors,
+ topTuples);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Start merging
+ */
+ while (!topTuples.areRunsExhausted()) {
+ /**
+ * Get the top record
+ */
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+
+ int currentTupleInOutFrame = outFrameAccessor
+ .getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex,
+ outFrameAccessor,
+ currentTupleInOutFrame) != 0) {
+ /**
+ * Initialize the first output record Reset the
+ * tuple builder
+ */
+ tupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ tupleBuilder.addField(fta, tupleIndex, i);
+ }
+ for (int i = 0; i < currentWorkingAggregators.length; i++) {
+ currentWorkingAggregators[i].init(fta,
+ tupleIndex,
+ tupleBuilder.getDataOutput(),
+ aggregateStates[i]);
+ tupleBuilder.addFieldEndOffset();
+ }
+ if (!outFrameAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ flushOutFrame(writer, finalPass);
+ if (!outFrameAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to append an aggregation result to the output frame.");
+ }
+ } else {
+ /**
+ * if new tuple is in the same group of the
+ * current aggregator do merge and output to the
+ * outFrame
+ */
+ int tupleOffset = outFrameAccessor
+ .getTupleStartOffset(currentTupleInOutFrame);
+ int fieldOffset = outFrameAccessor
+ .getFieldStartOffset(
+ currentTupleInOutFrame,
+ keyFields.length);
+ for (int i = 0; i < currentWorkingAggregators.length; i++) {
+ currentWorkingAggregators[i]
+ .aggregate(
+ fta,
+ tupleIndex,
+ outFrameAccessor
+ .getBuffer()
+ .array(),
+ tupleOffset
+ + outFrameAccessor
+ .getFieldSlotsLength()
+ + fieldOffset,
+ aggregateStates[i]);
+ }
+ }
+ tupleIndices[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndices,
+ runFileReaders, tupleAccessors, topTuples);
+ }
+
+ if (outFrameAppender.getTupleCount() > 0) {
+ flushOutFrame(writer, finalPass);
+ }
+
+ for (int i = 0; i < currentWorkingAggregators.length; i++) {
+ currentWorkingAggregators[i].close();
+ }
+ runs.subList(0, runNumber).clear();
+ /**
+ * insert the new run file into the beginning of the run
+ * file list
+ */
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal)
+ throws HyracksDataException {
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(
+ recordDescriptors[0].getFields().length);
+ }
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+ if (writerFrameAppender == null) {
+ writerFrameAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ outFrameAccessor.reset(outFrame);
+
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+ int tupleOffset = outFrameAccessor
+ .getTupleStartOffset(i);
+ finalTupleBuilder.reset();
+ for (int j = 0; j < keyFields.length; j++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, j);
+ }
+ for (int j = 0; j < currentWorkingAggregators.length; j++) {
+ int fieldOffset = outFrameAccessor
+ .getFieldStartOffset(i, keyFields.length
+ + j);
+ if (isFinal)
+ currentWorkingAggregators[j].outputFinalResult(
+ finalTupleBuilder.getDataOutput(),
+ outFrameAccessor.getBuffer().array(),
+ tupleOffset
+ + outFrameAccessor
+ .getFieldSlotsLength()
+ + fieldOffset,
+ aggregateStates[j]);
+ else
+ currentWorkingAggregators[j]
+ .outputPartialResult(
+ finalTupleBuilder
+ .getDataOutput(),
+ outFrameAccessor.getBuffer()
+ .array(),
+ tupleOffset
+ + outFrameAccessor
+ .getFieldSlotsLength()
+ + fieldOffset,
+ aggregateStates[j]);
+ finalTupleBuilder.addFieldEndOffset();
+ }
+
+ if (!writerFrameAppender.append(
+ finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0,
+ finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if (!writerFrameAppender.append(
+ finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0,
+ finalTupleBuilder.getSize()))
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
+ if (writerFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ }
+ outFrameAppender.reset(outFrame, true);
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndices,
+ RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors,
+ ReferencedPriorityQueue topTuples)
+ throws HyracksDataException {
+ int runStart = runIndex * runFrameLimit;
+ boolean existNext = false;
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
+ || runCursors[runIndex] == null) {
+ /**
+ * run already closed
+ */
+ existNext = false;
+ } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+ /**
+ * not the last frame for this run
+ */
+ existNext = true;
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
+ .getTupleCount()) {
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex]++;
+ }
+ } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
+ .getTupleCount()) {
+ /**
+ * the last frame has expired
+ */
+ existNext = true;
+ } else {
+ /**
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ int frameOffset = runIndex * runFrameLimit;
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = frameOffset;
+ /**
+ * read in batch
+ */
+ currentRunFrames[runIndex] = 0;
+ for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
+ ByteBuffer buffer = tupleAccessors[frameOffset]
+ .getBuffer();
+ if (runCursors[runIndex].nextFrame(buffer)) {
+ tupleAccessors[frameOffset].reset(buffer);
+ if (tupleAccessors[frameOffset].getTupleCount() > 0) {
+ existNext = true;
+ } else {
+ throw new IllegalStateException(
+ "illegal: empty run file");
+ }
+ currentRunFrames[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (existNext) {
+ topTuples
+ .popAndReplace(
+ tupleAccessors[currentFrameIndexInRun[runIndex]],
+ tupleIndices[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors,
+ IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1,
+ int j1, IFrameTupleAccessor fta2, int j2) {
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1)
+ + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2)
+ + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ return op;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(
+ final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+
+ @Override
+ public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
+ .getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
+ .getAccessor();
+ int j1 = o1.getTupleIndex();
+ int j2 = o2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1)
+ + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx)
+ - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2)
+ + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx)
+ - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ };
+ }
+
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
new file mode 100644
index 0000000..bdcc2e9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+
+class GroupingHashTable {
+ /**
+ * The pointers in the link store 3 int values for each entry in the
+ * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ *
+ * @author vinayakb
+ */
+ private static class Link {
+ private static final int INIT_POINTERS_SIZE = 9;
+
+ int[] pointers;
+ int size;
+
+ Link() {
+ pointers = new int[INIT_POINTERS_SIZE];
+ size = 0;
+ }
+
+ void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+ while (size + 3 > pointers.length) {
+ pointers = Arrays.copyOf(pointers, pointers.length * 2);
+ }
+ pointers[size++] = bufferIdx;
+ pointers[size++] = tIndex;
+ pointers[size++] = accumulatorIdx;
+ }
+ }
+
+ private static final int INIT_ACCUMULATORS_SIZE = 8;
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleAppender appender;
+ private final List<ByteBuffer> buffers;
+ private final Link[] table;
+ private IAggregateState[][] aggregateStates;
+ private int accumulatorSize;
+
+ private int lastBIndex;
+ private final int[] fields;
+ private final int[] storedKeys;
+ private final IBinaryComparator[] comparators;
+ private final FrameTuplePairComparator ftpc;
+ private final ITuplePartitionComputer tpc;
+ private final IFieldAggregateDescriptor[] aggregators;
+ private final RecordDescriptor inRecordDescriptor;
+ private final RecordDescriptor outRecordDescriptor;
+
+ private final FrameTupleAccessor storedKeysAccessor;
+
+ GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ ITuplePartitionComputerFactory tpcf,
+ IFieldAggregateDescriptorFactory[] aggregatorFactories,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int tableSize)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ buffers = new ArrayList<ByteBuffer>();
+ table = new Link[tableSize];
+
+ this.aggregateStates = new IAggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
+ accumulatorSize = 0;
+
+ this.fields = fields;
+ storedKeys = new int[fields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+ for (int i = 0; i < fields.length; ++i) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+ }
+
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+ tpc = tpcf.createPartitioner();
+
+ this.inRecordDescriptor = inRecordDescriptor;
+ this.outRecordDescriptor = outRecordDescriptor;
+
+ this.aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+ for (int i = 0; i < aggregatorFactories.length; i++) {
+ this.aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+ this.inRecordDescriptor, this.outRecordDescriptor);
+ }
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+ storedKeySerDeser);
+ storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ storedKeysRecordDescriptor);
+ lastBIndex = -1;
+ addNewBuffer();
+ }
+
+ private void addNewBuffer() {
+ ByteBuffer buffer = ctx.allocateFrame();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ buffers.add(buffer);
+ appender.reset(buffer, true);
+ ++lastBIndex;
+ }
+
+ private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+ throws HyracksDataException {
+ ByteBuffer frame = appender.getBuffer();
+ frame.position(0);
+ frame.limit(frame.capacity());
+ writer.nextFrame(appender.getBuffer());
+ appender.reset(appender.getBuffer(), true);
+ }
+
+ void insert(FrameTupleAccessor accessor, int tIndex)
+ throws Exception {
+ int entry = tpc.partition(accessor, tIndex, table.length);
+ Link link = table[entry];
+ if (link == null) {
+ link = table[entry] = new Link();
+ }
+ int saIndex = -1;
+ for (int i = 0; i < link.size; i += 3) {
+ int sbIndex = link.pointers[i];
+ int stIndex = link.pointers[i + 1];
+ storedKeysAccessor.reset(buffers.get(sbIndex));
+ int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+ if (c == 0) {
+ saIndex = link.pointers[i + 2];
+ break;
+ }
+ }
+ if (saIndex < 0) {
+ // Did not find the key. Insert a new entry.
+ saIndex = accumulatorSize++;
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ addNewBuffer();
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ throw new IllegalStateException();
+ }
+ }
+ int sbIndex = lastBIndex;
+ int stIndex = appender.getTupleCount() - 1;
+ for (int i = 0; i < aggregators.length; i++) {
+ IAggregateState aggState = aggregators[i].createState();
+ aggregators[i].init(accessor, tIndex, null, aggState);
+ if (saIndex >= aggregateStates[i].length) {
+ aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
+ aggregateStates[i].length * 2);
+ }
+ aggregateStates[i][saIndex] = aggState;
+ }
+ link.add(sbIndex, stIndex, saIndex);
+ } else {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].aggregate(accessor, tIndex, null, 0,
+ aggregateStates[i][saIndex]);
+ }
+ }
+ }
+
+ void write(IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ appender.reset(buffer, true);
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+ for (int i = 0; i < table.length; ++i) {
+ Link link = table[i];
+ if (link != null) {
+ for (int j = 0; j < link.size; j += 3) {
+ int bIndex = link.pointers[j];
+ int tIndex = link.pointers[j + 1];
+ int aIndex = link.pointers[j + 2];
+ ByteBuffer keyBuffer = buffers.get(bIndex);
+ storedKeysAccessor.reset(keyBuffer);
+
+ tupleBuilder.reset();
+ for (int k : this.fields) {
+ tupleBuilder.addField(storedKeysAccessor, tIndex, k);
+ }
+ for (int k = 0; k < aggregators.length; k++) {
+ aggregators[k].outputFinalResult(
+ tupleBuilder.getDataOutput(), null, 0,
+ aggregateStates[k][aIndex]);
+ tupleBuilder.addFieldEndOffset();
+ }
+ while (!appender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ flushFrame(appender, writer);
+ }
+ }
+ }
+ }
+ if (appender.getTupleCount() != 0) {
+ flushFrame(appender, writer);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..1b46bdd
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ *
+ */
+public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+ private static final int HASH_BUILD_ACTIVITY_ID = 0;
+
+ private static final int OUTPUT_ACTIVITY_ID = 1;
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] keys;
+ private final ITuplePartitionComputerFactory tpcf;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+
+ private final int tableSize;
+
+ public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
+ ITuplePartitionComputerFactory tpcf,
+ IBinaryComparatorFactory[] comparatorFactories,
+ IFieldAggregateDescriptorFactory[] aggregatorFactories,
+ RecordDescriptor outRecordDescriptor, int tableSize) {
+ super(spec, 1, 1);
+ this.keys = keys;
+ this.tpcf = tpcf;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactories = aggregatorFactories;
+ recordDescriptors[0] = outRecordDescriptor;
+ this.tableSize = tableSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+ * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+ */
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
+ HASH_BUILD_ACTIVITY_ID));
+ builder.addActivity(ha);
+
+ OutputActivity oa = new OutputActivity(new ActivityId(odId,
+ OUTPUT_ACTIVITY_ID));
+ builder.addActivity(oa);
+
+ builder.addSourceEdge(0, ha, 0);
+ builder.addTargetEdge(0, oa, 0);
+ builder.addBlockingEdge(ha, oa);
+ }
+
+ public static class HashBuildActivityState extends AbstractTaskState {
+ private GroupingHashTable table;
+
+ public HashBuildActivityState() {
+ }
+
+ private HashBuildActivityState(JobId jobId, TaskId tId) {
+ super(jobId, tId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ private class HashBuildActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public HashBuildActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions) {
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private HashBuildActivityState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new HashBuildActivityState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
+ state.table = new GroupingHashTable(ctx, keys,
+ comparatorFactories, tpcf, aggregatorFactories,
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0],
+ tableSize);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; ++i) {
+ try {
+ state.table.insert(accessor, i);
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ctx.setTaskState(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ throw new HyracksDataException(
+ "HashGroupOperator is failed.");
+ }
+ };
+ }
+ }
+
+ private class OutputActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public OutputActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ HashBuildActivityState buildState = (HashBuildActivityState) ctx
+ .getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), HASH_BUILD_ACTIVITY_ID),
+ partition));
+ GroupingHashTable table = buildState.table;
+ writer.open();
+ try {
+ table.write(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
new file mode 100644
index 0000000..332e63e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -0,0 +1,589 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ *
+ */
+public class HashSpillableTableFactory implements ISpillableTableFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final ITuplePartitionComputerFactory tpcf;
+ private final int tableSize;
+
+ public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf,
+ int tableSize) {
+ this.tpcf = tpcf;
+ this.tableSize = tableSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
+ * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
+ * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
+ * edu.
+ * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
+ * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
+ */
+ @Override
+ public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx,
+ final int[] keyFields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IFieldAggregateDescriptorFactory[] aggregatorFactories,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int framesLimit)
+ throws HyracksDataException {
+ final int[] storedKeys = new int[keyFields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+ for (int i = 0; i < keyFields.length; i++) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+ }
+
+ RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+ final FrameTupleAccessor storedKeysAccessor1;
+ final FrameTupleAccessor storedKeysAccessor2;
+ if (keyFields.length >= outRecordDescriptor.getFields().length) {
+ // for the case of zero-aggregations
+ ISerializerDeserializer<?>[] fields = outRecordDescriptor
+ .getFields();
+ ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
+ ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+ for (int i = 0; i < fields.length; i++)
+ newFields[i] = fields[i];
+ ITypeTrait[] newTypes = null;
+ if (types != null) {
+ newTypes = new ITypeTrait[types.length + 1];
+ for (int i = 0; i < types.length; i++)
+ newTypes[i] = types[i];
+ }
+ internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+ }
+ storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
+ internalRecordDescriptor);
+ storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
+ internalRecordDescriptor);
+
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
+ keyFields, storedKeys, comparators);
+ final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
+ storedKeys, storedKeys, comparators);
+ final FrameTupleAppender appender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+ final ByteBuffer outFrame = ctx.allocateFrame();
+
+ final ArrayTupleBuilder internalTupleBuilder;
+ if (keyFields.length < outRecordDescriptor.getFields().length)
+ internalTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+ else
+ internalTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length + 1);
+ final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+ final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+ final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+ final IAggregateState[] aggregateStates = new IAggregateState[aggregatorFactories.length];
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+ inRecordDescriptor, outRecordDescriptor);
+ aggregateStates[i] = aggregators[i].createState();
+ }
+
+ return new ISpillableTable() {
+
+ private int dataFrameCount;
+ private final ISerializableTable table = new SerializableHashTable(
+ tableSize, ctx);
+ private final TuplePointer storedTuplePointer = new TuplePointer();
+ private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+
+ /**
+ * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+ * = Frame index in the "Frames" list, [1] = Tuple index in the
+ * frame, [2] = Poor man's normalized key for the tuple.
+ */
+ private int[] tPointers;
+
+ @Override
+ public void sortFrames() {
+ int sfIdx = storedKeys[0];
+ int totalTCount = table.getTupleCount();
+ tPointers = new int[totalTCount * 3];
+ int ptr = 0;
+
+ for (int i = 0; i < tableSize; i++) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ tPointers[ptr * 3] = entry;
+ tPointers[ptr * 3 + 1] = offset;
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ int fIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(fIndex));
+ int tStart = storedKeysAccessor1
+ .getTupleStartOffset(tIndex);
+ int f0StartRel = storedKeysAccessor1
+ .getFieldStartOffset(tIndex, sfIdx);
+ int f0EndRel = storedKeysAccessor1.getFieldEndOffset(
+ tIndex, sfIdx);
+ int f0Start = f0StartRel + tStart
+ + storedKeysAccessor1.getFieldSlotsLength();
+ tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc
+ .normalize(storedKeysAccessor1.getBuffer()
+ .array(), f0Start, f0EndRel
+ - f0StartRel);
+ ptr++;
+ offset++;
+ } while (true);
+ }
+ /**
+ * Sort using quick sort
+ */
+ if (tPointers.length > 0) {
+ sort(tPointers, 0, totalTCount);
+ }
+ }
+
+ @Override
+ public void reset() {
+ dataFrameCount = -1;
+ tPointers = null;
+ table.reset();
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].close();
+ }
+ }
+
+ @Override
+ public boolean insert(FrameTupleAccessor accessor, int tIndex)
+ throws HyracksDataException {
+ if (dataFrameCount < 0)
+ nextAvailableFrame();
+ int entry = tpc.partition(accessor, tIndex, tableSize);
+ boolean foundGroup = false;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ storedKeysAccessor1.reset(frames
+ .get(storedTuplePointer.frameIndex));
+ int c = ftpcPartial.compare(accessor, tIndex,
+ storedKeysAccessor1, storedTuplePointer.tupleIndex);
+ if (c == 0) {
+ foundGroup = true;
+ break;
+ }
+ } while (true);
+
+ if (!foundGroup) {
+ /**
+ * If no matching group is found, create a new aggregator
+ * Create a tuple for the new group
+ */
+ internalTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ internalTupleBuilder.addField(accessor, tIndex,
+ keyFields[i]);
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].init(accessor, tIndex,
+ internalTupleBuilder.getDataOutput(),
+ aggregateStates[i]);
+ internalTupleBuilder.addFieldEndOffset();
+ }
+ if (!appender.append(
+ internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0,
+ internalTupleBuilder.getSize())) {
+ if (!nextAvailableFrame()) {
+ return false;
+ } else {
+ if (!appender.append(
+ internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0,
+ internalTupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to init an aggregator");
+ }
+ }
+ }
+
+ storedTuplePointer.frameIndex = dataFrameCount;
+ storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+ table.insert(entry, storedTuplePointer);
+ } else {
+ // If there is a matching found, do aggregation directly
+ int tupleOffset = storedKeysAccessor1
+ .getTupleStartOffset(storedTuplePointer.tupleIndex);
+
+ for (int i = 0; i < aggregators.length; i++) {
+ int aggFieldOffset = storedKeysAccessor1
+ .getFieldStartOffset(
+ storedTuplePointer.tupleIndex,
+ keyFields.length + i);
+ aggregators[i].aggregate(
+ accessor,
+ tIndex,
+ storedKeysAccessor1.getBuffer().array(),
+ tupleOffset
+ + storedKeysAccessor1
+ .getFieldSlotsLength()
+ + aggFieldOffset, aggregateStates[i]);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public List<ByteBuffer> getFrames() {
+ return frames;
+ }
+
+ @Override
+ public int getFrameCount() {
+ return dataFrameCount;
+ }
+
+ @Override
+ public void flushFrames(IFrameWriter writer, boolean isPartial)
+ throws HyracksDataException {
+ FrameTupleAppender appender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ writer.open();
+ appender.reset(outFrame, true);
+ if (tPointers == null) {
+ // Not sorted
+ for (int i = 0; i < tableSize; ++i) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++,
+ storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ int bIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+
+ storedKeysAccessor1.reset(frames.get(bIndex));
+ int tupleOffset = storedKeysAccessor1
+ .getTupleStartOffset(tIndex);
+ // Reset the tuple for the partial result
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(
+ storedKeysAccessor1, tIndex, k);
+ }
+ for (int k = 0; k < aggregators.length; k++) {
+ int fieldStart = storedKeysAccessor1
+ .getFieldStartOffset(tIndex,
+ keyFields.length + k);
+ if (isPartial)
+ aggregators[k]
+ .outputPartialResult(
+ outputTupleBuilder
+ .getDataOutput(),
+ storedKeysAccessor1
+ .getBuffer()
+ .array(),
+ tupleOffset
+ + storedKeysAccessor1
+ .getFieldSlotsLength()
+ + fieldStart,
+ aggregateStates[k]);
+ else
+ aggregators[k]
+ .outputFinalResult(
+ outputTupleBuilder
+ .getDataOutput(),
+ storedKeysAccessor1
+ .getBuffer()
+ .array(),
+ tupleOffset
+ + storedKeysAccessor1
+ .getFieldSlotsLength()
+ + fieldStart,
+ aggregateStates[k]);
+ outputTupleBuilder.addFieldEndOffset();
+ }
+
+ while (!appender.append(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ }
+ } while (true);
+ }
+ if (appender.getTupleCount() != 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].close();
+ }
+ return;
+ }
+ int n = tPointers.length / 3;
+ for (int ptr = 0; ptr < n; ptr++) {
+ int tableIndex = tPointers[ptr * 3];
+ int rowIndex = tPointers[ptr * 3 + 1];
+ table.getTuplePointer(tableIndex, rowIndex,
+ storedTuplePointer);
+ int frameIndex = storedTuplePointer.frameIndex;
+ int tupleIndex = storedTuplePointer.tupleIndex;
+ // Get the frame containing the value
+ ByteBuffer buffer = frames.get(frameIndex);
+ storedKeysAccessor1.reset(buffer);
+
+ int tupleOffset = storedKeysAccessor1
+ .getTupleStartOffset(tupleIndex);
+
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor1,
+ tupleIndex, k);
+ }
+ for (int k = 0; k < aggregators.length; k++) {
+ int fieldStart = storedKeysAccessor1
+ .getFieldStartOffset(tupleIndex,
+ keyFields.length + k);
+ if (isPartial)
+ aggregators[k].outputPartialResult(
+ outputTupleBuilder.getDataOutput(),
+ storedKeysAccessor1.getBuffer().array(),
+ tupleOffset
+ + storedKeysAccessor1
+ .getFieldSlotsLength()
+ + fieldStart, aggregateStates[k]);
+ else
+ aggregators[k].outputFinalResult(outputTupleBuilder
+ .getDataOutput(), storedKeysAccessor1
+ .getBuffer().array(), tupleOffset
+ + storedKeysAccessor1.getFieldSlotsLength()
+ + fieldStart, aggregateStates[k]);
+ outputTupleBuilder.addFieldEndOffset();
+ }
+ if (!appender.append(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].close();
+ }
+ }
+
+ @Override
+ public void close() {
+ dataFrameCount = -1;
+ tPointers = null;
+ table.close();
+ frames.clear();
+ }
+
+ /**
+ * Set the working frame to the next available frame in the frame
+ * list. There are two cases:<br>
+ * 1) If the next frame is not initialized, allocate a new frame. 2)
+ * When frames are already created, they are recycled.
+ *
+ * @return Whether a new frame is added successfully.
+ */
+ private boolean nextAvailableFrame() {
+ // Return false if the number of frames is equal to the limit.
+ if (dataFrameCount + 1 >= framesLimit)
+ return false;
+
+ if (frames.size() < framesLimit) {
+ // Insert a new frame
+ ByteBuffer frame = ctx.allocateFrame();
+ frame.position(0);
+ frame.limit(frame.capacity());
+ frames.add(frame);
+ appender.reset(frame, true);
+ dataFrameCount = frames.size() - 1;
+ } else {
+ // Reuse an old frame
+ dataFrameCount++;
+ ByteBuffer frame = frames.get(dataFrameCount);
+ frame.position(0);
+ frame.limit(frame.capacity());
+ appender.reset(frame, true);
+ }
+ return true;
+ }
+
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ int mTable = tPointers[m * 3];
+ int mRow = tPointers[m * 3 + 1];
+ int mNormKey = tPointers[m * 3 + 2];
+
+ table.getTuplePointer(mTable, mRow, storedTuplePointer);
+ int mFrame = storedTuplePointer.frameIndex;
+ int mTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(mFrame));
+
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int bTable = tPointers[b * 3];
+ int bRow = tPointers[b * 3 + 1];
+ int bNormKey = tPointers[b * 3 + 2];
+ int cmp = 0;
+ if (bNormKey != mNormKey) {
+ cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
+ : 1;
+ } else {
+ table.getTuplePointer(bTable, bRow,
+ storedTuplePointer);
+ int bFrame = storedTuplePointer.frameIndex;
+ int bTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(bFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2,
+ bTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cTable = tPointers[c * 3];
+ int cRow = tPointers[c * 3 + 1];
+ int cNormKey = tPointers[c * 3 + 2];
+ int cmp = 0;
+ if (cNormKey != mNormKey) {
+ cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
+ : 1;
+ } else {
+ table.getTuplePointer(cTable, cRow,
+ storedTuplePointer);
+ int cFrame = storedTuplePointer.frameIndex;
+ int cTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(cFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2,
+ cTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
+
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
+
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
+
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 3; ++i) {
+ int t = x[a * 3 + i];
+ x[a * 3 + i] = x[b * 3 + i];
+ x[b * 3 + i] = t;
+ }
+ }
+
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
new file mode 100644
index 0000000..169ada7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface IAggregateState extends Serializable {
+
+ /**
+ * Return the length of the state if in the frame
+ * @return
+ */
+ public int getLength();
+
+ /**
+ * Return the state as a java object
+ * @return
+ */
+ public Object getState();
+
+ /**
+ * Set the state.
+ * @param obj
+ */
+ public void setState(Object obj);
+
+ /**
+ * Reset the state.
+ */
+ public void reset();
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
new file mode 100644
index 0000000..06d5f68
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IFieldAggregateDescriptor {
+
+ /**
+ * Create an aggregate state
+ *
+ * @return
+ */
+ public IAggregateState createState();
+
+ /**
+ * Initialize the state based on the input tuple.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param fieldOutput
+ * The data output for the frame containing the state. This may
+ * be null, if the state is maintained as a java object
+ * @param state
+ * The state to be initialized.
+ * @throws HyracksDataException
+ */
+ public void init(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, IAggregateState state)
+ throws HyracksDataException;
+
+ /**
+ * Reset the aggregator. The corresponding aggregate state should be reset
+ * too. Note that here the frame is not an input argument, since it can be
+ * reset outside of the aggregator (simply reset the starting index of the
+ * buffer).
+ *
+ * @param state
+ */
+ public void reset(IAggregateState state);
+
+ /**
+ * Aggregate the value. Aggregate state should be updated correspondingly.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param data
+ * The buffer containing the state, if frame-based-state is used.
+ * This means that it can be null if java-object-based-state is
+ * used.
+ * @param offset
+ * @param state
+ * The aggregate state.
+ * @throws HyracksDataException
+ */
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ byte[] data, int offset, IAggregateState state)
+ throws HyracksDataException;
+
+ /**
+ * Output the partial aggregation result.
+ *
+ * @param fieldOutput
+ * The data output for the output frame
+ * @param data
+ * The buffer containing the aggregation state
+ * @param offset
+ * @param state
+ * The aggregation state.
+ * @throws HyracksDataException
+ */
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException;
+
+ /**
+ * Output the final aggregation result.
+ *
+ * @param fieldOutput
+ * The data output for the output frame
+ * @param data
+ * The buffer containing the aggregation state
+ * @param offset
+ * @param state
+ * The aggregation state.
+ * @throws HyracksDataException
+ */
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException;
+
+ public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
new file mode 100644
index 0000000..0e2d530
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IFieldAggregateDescriptorFactory extends Serializable {
+
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
new file mode 100644
index 0000000..cc8cbd2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface ISpillableTable {
+
+ public void close();
+
+ public void reset();
+
+ public int getFrameCount();
+
+ public List<ByteBuffer> getFrames();
+
+ public void sortFrames();
+
+ public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
new file mode 100644
index 0000000..6420766
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISpillableTableFactory extends Serializable {
+ ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
+ IFieldAggregateDescriptorFactory[] aggregatorFactories, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
new file mode 100644
index 0000000..886dd06
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int aggField;
+
+ public AvgAggregatorFactory(int aggField){
+ this.aggField = aggField;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ */
+ @Override
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+
+ return new IFieldAggregateDescriptor() {
+
+ @Override
+ public void reset(IAggregateState state) {
+ state.reset();
+ }
+
+ @Override
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException {
+ int sum, count;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum = fields[0];
+ count = fields[1];
+ }
+ try {
+ fieldOutput.writeInt(sum);
+ fieldOutput.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException {
+ int sum, count;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum = fields[0];
+ count = fields[1];
+ }
+ try {
+ fieldOutput.writeFloat((float)sum/count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0;
+ int count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ if (fieldOutput != null) {
+ try {
+ fieldOutput.writeInt(sum);
+ fieldOutput.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ } else {
+ state.setState(new Object[]{sum, count});
+ }
+ }
+
+ @Override
+ public IAggregateState createState() {
+ return new IAggregateState() {
+
+ private static final long serialVersionUID = 1L;
+
+ Object state = null;
+
+ @Override
+ public void setState(Object obj) {
+ state = null;
+ state = obj;
+ }
+
+ @Override
+ public void reset() {
+ state = null;
+ }
+
+ @Override
+ public Object getState() {
+ return state;
+ }
+
+ @Override
+ public int getLength() {
+ return 8;
+ }
+ };
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ byte[] data, int offset, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0, count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ count += 1;
+ if (data != null) {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ count += buf.getInt(offset + 4);
+ buf.putInt(offset, sum);
+ buf.putInt(offset + 4, count);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum += fields[0];
+ count += fields[1];
+ state.setState(new Object[]{sum, count});
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
new file mode 100644
index 0000000..6760544
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class AvgMergeAggregatorFactory implements
+ IFieldAggregateDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int aggField;
+
+ public AvgMergeAggregatorFactory(int aggField){
+ this.aggField = aggField;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+ * IFieldAggregateDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ */
+ @Override
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+ return new IFieldAggregateDescriptor() {
+
+ @Override
+ public void reset(IAggregateState state) {
+ state.reset();
+ }
+
+ @Override
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException {
+ int sum, count;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum = fields[0];
+ count = fields[1];
+ }
+ try {
+ fieldOutput.writeInt(sum);
+ fieldOutput.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state) throws HyracksDataException {
+ int sum, count;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum = fields[0];
+ count = fields[1];
+ }
+ try {
+ fieldOutput.writeFloat((float)sum/count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0;
+ int count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ count += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart + 4);
+ if (fieldOutput != null) {
+ try {
+ fieldOutput.writeInt(sum);
+ fieldOutput.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ } else {
+ state.setState(new Object[]{sum, count});
+ }
+ }
+
+ @Override
+ public IAggregateState createState() {
+ return new IAggregateState() {
+
+ private static final long serialVersionUID = 1L;
+
+ Object state = null;
+
+ @Override
+ public void setState(Object obj) {
+ state = null;
+ state = obj;
+ }
+
+ @Override
+ public void reset() {
+ state = null;
+ }
+
+ @Override
+ public Object getState() {
+ return state;
+ }
+
+ @Override
+ public int getLength() {
+ return 8;
+ }
+ };
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ byte[] data, int offset, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0, count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ count += 1;
+ if (data != null) {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ count += buf.getInt(offset + 4);
+ buf.putInt(offset, sum);
+ buf.putInt(offset + 4, count);
+ } else {
+ Integer[] fields = (Integer[])state.getState();
+ sum += fields[0];
+ count += fields[1];
+ state.setState(new Object[]{sum, count});
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
new file mode 100644
index 0000000..3124ae6
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class IntSumAggregatorFactory implements
+ IFieldAggregateDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int aggField;
+
+ public IntSumAggregatorFactory(int aggField) {
+ this.aggField = aggField;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+ * IFieldAggregateDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ */
+ @Override
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+
+ return new IFieldAggregateDescriptor() {
+
+ @Override
+ public void reset(IAggregateState state) {
+ state.reset();
+ }
+
+ @Override
+ public void outputPartialResult(DataOutput fieldOutput,
+ byte[] data, int offset, IAggregateState state)
+ throws HyracksDataException {
+ int sum;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ } else {
+ sum = (Integer) (state.getState());
+ }
+ try {
+ fieldOutput.writeInt(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+ int offset, IAggregateState state)
+ throws HyracksDataException {
+ int sum;
+ if (data != null) {
+ sum = IntegerSerializerDeserializer.getInt(data, offset);
+ } else {
+ sum = (Integer) (state.getState());
+ }
+ try {
+ fieldOutput.writeInt(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ if (fieldOutput != null) {
+ try {
+ fieldOutput.writeInt(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ } else {
+ state.setState(new Integer(sum));
+ }
+ }
+
+ @Override
+ public IAggregateState createState() {
+ return new IAggregateState() {
+
+ private static final long serialVersionUID = 1L;
+
+ Integer sum = null;
+
+ public int getLength() {
+ return 4;
+ }
+
+ public Object getState() {
+ return sum;
+ }
+
+ public void setState(Object obj) {
+ sum = null;
+ sum = (Integer) obj;
+ }
+
+ public void reset() {
+ sum = null;
+ }
+
+ };
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ byte[] data, int offset, IAggregateState state)
+ throws HyracksDataException {
+ int sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ if (data != null) {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getInt(offset);
+ buf.putInt(offset, sum);
+ } else {
+ sum += (Integer) (state.getState());
+ state.setState(new Integer(sum));
+ }
+ }
+ };
+ }
+
+}