Pre-merge cleanup: removed old aggregator interfaces, and updated codes to use the new interface.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@963 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
deleted file mode 100644
index 2327965..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
-
-/**
- *
- */
-public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
- private static final int AGGREGATE_ACTIVITY_ID = 0;
-
- private static final int MERGE_ACTIVITY_ID = 1;
-
- private static final long serialVersionUID = 1L;
- private final int[] keyFields;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
- private final IAggregatorDescriptorFactory aggregatorFactory;
- private final IAggregatorDescriptorFactory mergerFactory;
-
- private final int framesLimit;
- private final ISpillableTableFactory spillableTableFactory;
- private final boolean isOutputSorted;
-
- public ExternalGroupOperatorDescriptor(JobSpecification spec,
- int[] keyFields, int framesLimit,
- IBinaryComparatorFactory[] comparatorFactories,
- INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory,
- IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor recordDescriptor,
- ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- if (framesLimit <= 1) {
- /**
- * Minimum of 2 frames: 1 for input records, and 1 for output
- * aggregation results.
- */
- throw new IllegalStateException(
- "frame limit should at least be 2, but it is "
- + framesLimit + "!");
- }
- this.aggregatorFactory = aggregatorFactory;
- this.mergerFactory = mergerFactory;
- this.keyFields = keyFields;
- this.comparatorFactories = comparatorFactories;
- this.firstNormalizerFactory = firstNormalizerFactory;
- this.spillableTableFactory = spillableTableFactory;
- this.isOutputSorted = isOutputSorted;
-
- /**
- * Set the record descriptor. Note that since this operator is a unary
- * operator, only the first record descriptor is used here.
- */
- recordDescriptors[0] = recordDescriptor;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
- * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
- */
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
- getOperatorId(), AGGREGATE_ACTIVITY_ID));
- MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
- MERGE_ACTIVITY_ID));
-
- builder.addActivity(aggregateAct);
- builder.addSourceEdge(0, aggregateAct, 0);
-
- builder.addActivity(mergeAct);
- builder.addTargetEdge(0, mergeAct, 0);
-
- builder.addBlockingEdge(aggregateAct, mergeAct);
- }
-
- public static class AggregateActivityState extends AbstractTaskState {
- private LinkedList<RunFileReader> runs;
-
- private ISpillableTable gTable;
-
- public AggregateActivityState() {
- }
-
- private AggregateActivityState(JobId jobId, TaskId tId) {
- super(jobId, tId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
- throw new UnsupportedOperationException();
- }
- }
-
- private class AggregateActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public AggregateActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions)
- throws HyracksDataException {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private AggregateActivityState state;
-
- @Override
- public void open() throws HyracksDataException {
- state = new AggregateActivityState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
- state.runs = new LinkedList<RunFileReader>();
- state.gTable = spillableTableFactory.buildSpillableTable(
- ctx, keyFields, comparatorFactories,
- firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
- ExternalGroupOperatorDescriptor.this.framesLimit);
- state.gTable.reset();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- /**
- * If the group table is too large, flush the table into
- * a run file.
- */
- if (!state.gTable.insert(accessor, i)) {
- flushFramesToRun();
- if (!state.gTable.insert(accessor, i))
- throw new HyracksDataException(
- "Failed to insert a new buffer into the aggregate operator!");
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (state.gTable.getFrameCount() >= 0) {
- if (state.runs.size() > 0) {
- /**
- * flush the memory into the run file.
- */
- flushFramesToRun();
- state.gTable.close();
- state.gTable = null;
- }
- }
- ctx.setTaskState(state);
- }
-
- private void flushFramesToRun() throws HyracksDataException {
- FileReference runFile;
- try {
- runFile = ctx.getJobletContext()
- .createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class
- .getSimpleName());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile,
- ctx.getIOManager());
- writer.open();
- try {
- state.gTable.sortFrames();
- state.gTable.flushFrames(writer, true);
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- writer.close();
- }
- state.gTable.reset();
- state.runs.add(((RunFileWriter) writer).createReader());
- }
- };
- return op;
- }
- }
-
- private class MergeActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public MergeActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions)
- throws HyracksDataException {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
- }
-
- int[] keyFieldsInPartialResults = new int[keyFields.length];
- for(int i = 0; i < keyFieldsInPartialResults.length; i++){
- keyFieldsInPartialResults[i] = i;
- }
-
- final IAggregatorDescriptor aggregator = mergerFactory
- .createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0], keyFields, keyFieldsInPartialResults);
- final AggregateState aggregateState = aggregator
- .createAggregateStates();
-
- final int[] storedKeys = new int[keyFields.length];
- /**
- * Get the list of the fields in the stored records.
- */
- for (int i = 0; i < keyFields.length; ++i) {
- storedKeys[i] = i;
- }
-
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- /**
- * Input frames, one for each run file.
- */
- private List<ByteBuffer> inFrames;
-
- /**
- * Output frame.
- */
- private ByteBuffer outFrame, writerFrame;
-
- private LinkedList<RunFileReader> runs;
-
- private AggregateActivityState aggState;
-
- /**
- * how many frames to be read ahead once
- */
- private int runFrameLimit = 1;
-
- private int[] currentFrameIndexInRun;
- private int[] currentRunFrames;
- private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
- ctx.getFrameSize(), recordDescriptors[0]);
- private FrameTupleAppender writerFrameAppender;
-
- public void initialize() throws HyracksDataException {
- aggState = (AggregateActivityState) ctx
- .getTaskState(new TaskId(new ActivityId(
- getOperatorId(), AGGREGATE_ACTIVITY_ID),
- partition));
- runs = aggState.runs;
- writer.open();
- try {
- if (runs.size() <= 0) {
- ISpillableTable gTable = aggState.gTable;
- if (gTable != null) {
- if (isOutputSorted)
- gTable.sortFrames();
- gTable.flushFrames(writer, false);
- }
- } else {
- runs = new LinkedList<RunFileReader>(runs);
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outFrameAppender.reset(outFrame, true);
- outFrameAccessor.reset(outFrame);
- while (runs.size() > 0) {
- try {
- doPass(runs);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- inFrames.clear();
- }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- aggregateState.close();
- writer.close();
- }
- }
-
- private void doPass(LinkedList<RunFileReader> runs)
- throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
-
- while (inFrames.size() + 2 < framesLimit) {
- inFrames.add(ctx.allocateFrame());
- }
- int runNumber;
- if (runs.size() + 2 <= framesLimit) {
- finalPass = true;
- runFrameLimit = (framesLimit - 2) / runs.size();
- runNumber = runs.size();
- } else {
- runNumber = framesLimit - 2;
- newRun = ctx.getJobletContext()
- .createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class
- .getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- currentFrameIndexInRun = new int[runNumber];
- currentRunFrames = new int[runNumber];
- /**
- * Create file readers for each input run file, only for
- * the ones fit into the inFrames
- */
- RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
- .size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
- ctx.getFrameSize(), recordDescriptors[0],
- runNumber, comparator);
- /**
- * current tuple index in each run
- */
- int[] tupleIndices = new int[runNumber];
-
- for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
- tupleIndices[runIndex] = 0;
- // Load the run file
- runFileReaders[runIndex] = runs.get(runIndex);
- runFileReaders[runIndex].open();
-
- currentRunFrames[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runIndex
- * runFrameLimit;
- for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex]
- + j;
- if (runFileReaders[runIndex].nextFrame(inFrames
- .get(frameIndex))) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescriptors[0]);
- tupleAccessors[frameIndex].reset(inFrames
- .get(frameIndex));
- currentRunFrames[runIndex]++;
- if (j == 0)
- setNextTopTuple(runIndex, tupleIndices,
- runFileReaders, tupleAccessors,
- topTuples);
- } else {
- closeRun(runIndex, runFileReaders, tupleAccessors);
- break;
- }
- }
- }
-
- /**
- * Start merging
- */
- while (!topTuples.areRunsExhausted()) {
- /**
- * Get the top record
- */
- ReferenceEntry top = topTuples.peek();
- int tupleIndex = top.getTupleIndex();
- int runIndex = topTuples.peek().getRunid();
- FrameTupleAccessor fta = top.getAccessor();
-
- int currentTupleInOutFrame = outFrameAccessor
- .getTupleCount() - 1;
- if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex,
- outFrameAccessor,
- currentTupleInOutFrame) != 0) {
- /**
- * Initialize the first output record Reset the
- * tuple builder
- */
- if (!aggregator.initFromPartial(outFrameAppender, fta,
- tupleIndex, aggregateState)) {
- flushOutFrame(writer, finalPass);
- if (!aggregator.initFromPartial(outFrameAppender, fta,
- tupleIndex, aggregateState)) {
- throw new HyracksDataException(
- "Failed to append an aggregation result to the output frame.");
- }
- }
-
- } else {
- /**
- * if new tuple is in the same group of the
- * current aggregator do merge and output to the
- * outFrame
- */
-
- aggregator.aggregate(fta, tupleIndex,
- outFrameAccessor,
- currentTupleInOutFrame, aggregateState);
-
- }
- tupleIndices[runIndex]++;
- setNextTopTuple(runIndex, tupleIndices,
- runFileReaders, tupleAccessors, topTuples);
- }
-
- if (outFrameAppender.getTupleCount() > 0) {
- flushOutFrame(writer, finalPass);
- }
-
- aggregator.close();
-
- runs.subList(0, runNumber).clear();
- /**
- * insert the new run file into the beginning of the run
- * file list
- */
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
-
- private void flushOutFrame(IFrameWriter writer, boolean isFinal)
- throws HyracksDataException {
- if (writerFrame == null) {
- writerFrame = ctx.allocateFrame();
- }
- if (writerFrameAppender == null) {
- writerFrameAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- writerFrameAppender.reset(writerFrame, true);
- }
- outFrameAccessor.reset(outFrame);
-
- for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
- if(isFinal){
- if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
- throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
- }
- }
- } else {
- if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
- throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
- }
- }
- }
- }
- if (writerFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- }
- outFrameAppender.reset(outFrame, true);
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndices,
- RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors,
- ReferencedPriorityQueue topTuples)
- throws HyracksDataException {
- int runStart = runIndex * runFrameLimit;
- boolean existNext = false;
- if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
- || runCursors[runIndex] == null) {
- /**
- * run already closed
- */
- existNext = false;
- } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
- /**
- * not the last frame for this run
- */
- existNext = true;
- if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
- .getTupleCount()) {
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex]++;
- }
- } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
- .getTupleCount()) {
- /**
- * the last frame has expired
- */
- existNext = true;
- } else {
- /**
- * If all tuples in the targeting frame have been
- * checked.
- */
- int frameOffset = runIndex * runFrameLimit;
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = frameOffset;
- /**
- * read in batch
- */
- currentRunFrames[runIndex] = 0;
- for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
- ByteBuffer buffer = tupleAccessors[frameOffset]
- .getBuffer();
- if (runCursors[runIndex].nextFrame(buffer)) {
- tupleAccessors[frameOffset].reset(buffer);
- if (tupleAccessors[frameOffset].getTupleCount() > 0) {
- existNext = true;
- } else {
- throw new IllegalStateException(
- "illegal: empty run file");
- }
- currentRunFrames[runIndex]++;
- } else {
- break;
- }
- }
- }
-
- if (existNext) {
- topTuples
- .popAndReplace(
- tupleAccessors[currentFrameIndexInRun[runIndex]],
- tupleIndices[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- /**
- * Close the run file, and also the corresponding readers and
- * input frame.
- *
- * @param index
- * @param runCursors
- * @param tupleAccessor
- * @throws HyracksDataException
- */
- private void closeRun(int index, RunFileReader[] runCursors,
- IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- if (runCursors[index] != null) {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
- }
-
- private int compareFrameTuples(IFrameTupleAccessor fta1,
- int j1, IFrameTupleAccessor fta2, int j2) {
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1)
- + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2)
- + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2_start = fta2.getFieldStartOffset(j2, fIdx);
- int l2_end = fta2.getFieldEndOffset(j2, fIdx);
- int l2 = l2_end - l2_start;
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- return op;
- }
-
- private Comparator<ReferenceEntry> createEntryComparator(
- final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
-
- @Override
- public int compare(ReferenceEntry o1, ReferenceEntry o2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
- .getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
- .getAccessor();
- int j1 = o1.getTupleIndex();
- int j2 = o2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1)
- + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx)
- - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2)
- + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx)
- - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- };
- }
-
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
deleted file mode 100644
index d5edd25..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-
-class GroupingHashTable {
- /**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
- * @author vinayakb
- */
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 9;
-
- int[] pointers;
- int size;
-
- Link() {
- pointers = new int[INIT_POINTERS_SIZE];
- size = 0;
- }
-
- void add(int bufferIdx, int tIndex, int accumulatorIdx) {
- while (size + 3 > pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
- }
- pointers[size++] = bufferIdx;
- pointers[size++] = tIndex;
- pointers[size++] = accumulatorIdx;
- }
- }
-
- private static final int INIT_AGG_STATE_SIZE = 8;
- private final IHyracksTaskContext ctx;
- private final FrameTupleAppender appender;
- private final List<ByteBuffer> buffers;
- private final Link[] table;
- /**
- * Aggregate states: a list of states for all groups maintained in the main
- * memory.
- */
- private AggregateState[] aggregateStates;
- private int accumulatorSize;
-
- private int lastBIndex;
- private final int[] storedKeys;
- private final IBinaryComparator[] comparators;
- private final FrameTuplePairComparator ftpc;
- private final ITuplePartitionComputer tpc;
- private final IAggregatorDescriptor aggregator;
-
- private final FrameTupleAccessor storedKeysAccessor;
-
- GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
- IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf,
- IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int tableSize)
- throws HyracksDataException {
- this.ctx = ctx;
- appender = new FrameTupleAppender(ctx.getFrameSize());
- buffers = new ArrayList<ByteBuffer>();
- table = new Link[tableSize];
-
- storedKeys = new int[fields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
- for (int i = 0; i < fields.length; ++i) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
- }
-
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
- tpc = tpcf.createPartitioner();
-
- int[] keyFieldsInPartialResults = new int[fields.length];
- for(int i = 0; i < keyFieldsInPartialResults.length; i++){
- keyFieldsInPartialResults[i] = i;
- }
-
- this.aggregator = aggregatorFactory.createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
-
- this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
- accumulatorSize = 0;
-
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
- storedKeySerDeser);
- storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- storedKeysRecordDescriptor);
- lastBIndex = -1;
- addNewBuffer();
- }
-
- private void addNewBuffer() {
- ByteBuffer buffer = ctx.allocateFrame();
- buffer.position(0);
- buffer.limit(buffer.capacity());
- buffers.add(buffer);
- appender.reset(buffer, true);
- ++lastBIndex;
- }
-
- private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
- throws HyracksDataException {
- ByteBuffer frame = appender.getBuffer();
- frame.position(0);
- frame.limit(frame.capacity());
- writer.nextFrame(appender.getBuffer());
- appender.reset(appender.getBuffer(), true);
- }
-
- void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
- int entry = tpc.partition(accessor, tIndex, table.length);
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- int saIndex = -1;
- for (int i = 0; i < link.size; i += 3) {
- int sbIndex = link.pointers[i];
- int stIndex = link.pointers[i + 1];
- storedKeysAccessor.reset(buffers.get(sbIndex));
- int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
- if (c == 0) {
- saIndex = link.pointers[i + 2];
- break;
- }
- }
- if (saIndex < 0) {
- // Did not find the key. Insert a new entry.
- saIndex = accumulatorSize++;
- // Add keys
-
- // Add index to the keys in frame
- int sbIndex = lastBIndex;
- int stIndex = appender.getTupleCount();
-
- // Add aggregation fields
- AggregateState newState = aggregator.createAggregateStates();
-
- if(!aggregator.init(appender, accessor, tIndex, newState)){
- addNewBuffer();
- sbIndex = lastBIndex;
- stIndex = appender.getTupleCount();
- if(!aggregator.init(appender, accessor, tIndex, newState)){
- throw new IllegalStateException();
- }
- }
-
- if (accumulatorSize >= aggregateStates.length) {
- aggregateStates = Arrays.copyOf(aggregateStates,
- aggregateStates.length * 2);
- }
-
- aggregateStates[saIndex] = newState;
-
- link.add(sbIndex, stIndex, saIndex);
- } else {
- aggregator.aggregate(accessor, tIndex, null, 0,
- aggregateStates[saIndex]);
- }
- }
-
- void write(IFrameWriter writer) throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- appender.reset(buffer, true);
- for (int i = 0; i < table.length; ++i) {
- Link link = table[i];
- if (link != null) {
- for (int j = 0; j < link.size; j += 3) {
- int bIndex = link.pointers[j];
- int tIndex = link.pointers[j + 1];
- int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = buffers.get(bIndex);
- storedKeysAccessor.reset(keyBuffer);
-
- while (!aggregator
- .outputFinalResult(appender, storedKeysAccessor,
- tIndex, aggregateStates[aIndex])) {
- flushFrame(appender, writer);
- }
- }
- }
- }
- if (appender.getTupleCount() != 0) {
- flushFrame(appender, writer);
- }
- }
-
- void close() throws HyracksDataException {
- for(AggregateState aState : aggregateStates){
- aState.close();
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
deleted file mode 100644
index 2596b91..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- *
- */
-public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
- private static final int HASH_BUILD_ACTIVITY_ID = 0;
-
- private static final int OUTPUT_ACTIVITY_ID = 1;
-
- private static final long serialVersionUID = 1L;
-
- private final int[] keys;
- private final ITuplePartitionComputerFactory tpcf;
- private final IBinaryComparatorFactory[] comparatorFactories;
-
- private final IAggregatorDescriptorFactory aggregatorFactory;
-
- private final int tableSize;
-
- public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
- ITuplePartitionComputerFactory tpcf,
- IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor outRecordDescriptor, int tableSize) {
- super(spec, 1, 1);
- this.keys = keys;
- this.tpcf = tpcf;
- this.comparatorFactories = comparatorFactories;
- this.aggregatorFactory = aggregatorFactory;
- recordDescriptors[0] = outRecordDescriptor;
- this.tableSize = tableSize;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
- * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
- */
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
- HASH_BUILD_ACTIVITY_ID));
- builder.addActivity(ha);
-
- OutputActivity oa = new OutputActivity(new ActivityId(odId,
- OUTPUT_ACTIVITY_ID));
- builder.addActivity(oa);
-
- builder.addSourceEdge(0, ha, 0);
- builder.addTargetEdge(0, oa, 0);
- builder.addBlockingEdge(ha, oa);
- }
-
- public static class HashBuildActivityState extends AbstractTaskState {
- private GroupingHashTable table;
-
- public HashBuildActivityState() {
- }
-
- private HashBuildActivityState(JobId jobId, TaskId tId) {
- super(jobId, tId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
- }
-
- private class HashBuildActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public HashBuildActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private HashBuildActivityState state;
-
- @Override
- public void open() throws HyracksDataException {
- state = new HashBuildActivityState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
- state.table = new GroupingHashTable(ctx, keys,
- comparatorFactories, tpcf, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
- tableSize);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- try {
- state.table.insert(accessor, i);
- } catch (Exception e) {
- System.out.println(e.toString());
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- ctx.setTaskState(state);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- throw new HyracksDataException(
- "HashGroupOperator is failed.");
- }
- };
- }
- }
-
- private class OutputActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public OutputActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- HashBuildActivityState buildState = (HashBuildActivityState) ctx
- .getTaskState(new TaskId(new ActivityId(
- getOperatorId(), HASH_BUILD_ACTIVITY_ID),
- partition));
- GroupingHashTable table = buildState.table;
- writer.open();
- try {
- table.write(writer);
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
- };
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
deleted file mode 100644
index cc8cbd2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public interface ISpillableTable {
-
- public void close();
-
- public void reset();
-
- public int getFrameCount();
-
- public List<ByteBuffer> getFrames();
-
- public void sortFrames();
-
- public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
- public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
deleted file mode 100644
index bae041d..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface ISpillableTableFactory extends Serializable {
- ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
- IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
- IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
deleted file mode 100644
index 1c1c7a2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class PreclusteredGroupOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
- private final int[] groupFields;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IAggregatorDescriptorFactory aggregatorFactory;
-
- private static final long serialVersionUID = 1L;
-
- public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
- int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor) {
- super(spec, 1, 1);
- this.groupFields = groupFields;
- this.comparatorFactories = comparatorFactories;
- this.aggregatorFactory = aggregatorFactory;
- recordDescriptors[0] = recordDescriptor;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) throws HyracksDataException {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- final RecordDescriptor inRecordDesc = recordDescProvider
- .getInputRecordDescriptor(getOperatorId(), 0);
- final IAggregatorDescriptor aggregator = aggregatorFactory
- .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
- groupFields, groupFields);
- final ByteBuffer copyFrame = ctx.allocateFrame();
- final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
- ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
- ByteBuffer outFrame = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(
- ctx.getFrameSize());
- appender.reset(outFrame, true);
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private PreclusteredGroupWriter pgw;
-
- @Override
- public void open() throws HyracksDataException {
- pgw = new PreclusteredGroupWriter(ctx, groupFields,
- comparators, aggregator, inRecordDesc, writer);
- pgw.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- pgw.nextFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- pgw.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- pgw.close();
- }
- };
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
deleted file mode 100644
index db6e4ab..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class PreclusteredGroupWriter implements IFrameWriter {
- private final int[] groupFields;
- private final IBinaryComparator[] comparators;
- private final IAggregatorDescriptor aggregator;
- private final AggregateState aggregateState;
- private final IFrameWriter writer;
- private final ByteBuffer copyFrame;
- private final FrameTupleAccessor inFrameAccessor;
- private final FrameTupleAccessor copyFrameAccessor;
- private final ByteBuffer outFrame;
- private final FrameTupleAppender appender;
- private boolean first;
-
- public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
- this.groupFields = groupFields;
- this.comparators = comparators;
- this.aggregator = aggregator;
- this.aggregateState = aggregator.createAggregateStates();
- this.writer = writer;
- copyFrame = ctx.allocateFrame();
- inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
- outFrame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(outFrame, true);
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- first = true;
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inFrameAccessor.reset(buffer);
- int nTuples = inFrameAccessor.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- if (first) {
- aggregator.init(null, inFrameAccessor, i, aggregateState);
- first = false;
- } else {
- if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
- } else {
- switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
- }
-
- }
- }
- FrameUtils.copy(buffer, copyFrame);
- }
-
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
- FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
- writeOutput(prevTupleAccessor, prevTupleIndex);
- aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
- } else {
- aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
- }
- }
-
- private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
- throws HyracksDataException {
- if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- appender.reset(appender.getBuffer(), true);
- if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
- throw new IllegalStateException();
- }
- }
- }
-
- private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
- for (int i = 0; i < comparators.length; ++i) {
- int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
- int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
- int l2 = a2.getFieldLength(t2Idx, fIdx);
- if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (!first) {
- writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- }
- }
- aggregateState.close();
- writer.close();
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
deleted file mode 100644
index 45d49e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private final int avgField;
- private int outField = -1;
-
- public AvgAggregatorDescriptorFactory(int avgField) {
- this.avgField = avgField;
- }
-
- public AvgAggregatorDescriptorFactory(int avgField, int outField) {
- this.avgField = avgField;
- this.outField = outField;
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
-
- if (this.outField < 0)
- this.outField = keyFields.length;
-
- return new IAggregatorDescriptor() {
-
- @Override
- public void reset() {
-
- }
-
- @Override
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
-
- try {
- tupleBuilder.getDataOutput().writeInt(sum / count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
- try {
- tupleBuilder.getDataOutput().writeInt(sum);
- tupleBuilder.getDataOutput().writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- // Init aggregation value
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, avgField);
- int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- int count = 1;
-
- try {
- tupleBuilder.getDataOutput().writeInt(sum);
- tupleBuilder.getDataOutput().writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- int sum1 = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- int count1 = 1;
-
- int sum2 = IntegerSerializerDeserializer.getInt(data, offset);
- int count2 = IntegerSerializerDeserializer.getInt(data, offset + 4);
-
- ByteBuffer buf = ByteBuffer.wrap(data, offset, 8);
- buf.putInt(sum1 + sum2);
- buf.putInt(count1 + count2);
-
- return 8;
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
deleted file mode 100644
index 1cc3340..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-
-public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private static final int INIT_ACCUMULATORS_SIZE = 8;
- private final int concatField;
- private int outField = -1;
-
- /**
- * Initialize the aggregator, with the field to be concatenated.
- *
- * @param concatField
- */
- public ConcatAggregatorDescriptorFactory(int concatField) {
- this.concatField = concatField;
- }
-
- /**
- * Initialize the aggregator, with the field index to be concatenated, and
- * also the field where the aggregation result will be outputted.
- *
- * @param concatField
- * @param outField
- */
- public ConcatAggregatorDescriptorFactory(int concatField, int outField) {
- this.concatField = concatField;
- this.outField = outField;
- }
-
- /**
- * Create a concatenation aggregator. A byte buffer will be allocated inside of the
- * aggregator to contain the partial aggregation results. A reference will be written
- * onto the output frame for indexing the aggregation result from the buffer.
- */
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
- if (this.outField < 0)
- this.outField = keyFields.length;
-
- return new IAggregatorDescriptor() {
-
- byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
-
- int currentAggregatorIndex = -1;
- int aggregatorCount = 0;
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- // Initialize the aggregation value
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
- int fieldLength = accessor.getFieldLength(tIndex, concatField);
- int appendOffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
- // Get the initial value
- currentAggregatorIndex++;
- if (currentAggregatorIndex >= buf.length) {
- byte[][] newBuf = new byte[buf.length * 2][];
- for (int i = 0; i < buf.length; i++) {
- newBuf[i] = buf[i];
- }
- this.buf = newBuf;
- }
- buf[currentAggregatorIndex] = new byte[fieldLength];
- System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
- fieldLength);
- // Update the aggregator index
- aggregatorCount++;
-
- try {
- tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void reset() {
- currentAggregatorIndex = -1;
- aggregatorCount = 0;
- }
-
- @Override
- public void close() {
- currentAggregatorIndex = -1;
- aggregatorCount = 0;
- for (int i = 0; i < buf.length; i++) {
- buf[i] = null;
- }
- }
-
- @Override
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException {
- int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
- // FIXME Should be done in binary way
- StringBuilder sbder = new StringBuilder();
- sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(buf[refIndex]))));
- // Get the new data
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
- int fieldLength = accessor.getFieldLength(tIndex, concatField);
- sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
- + accessor.getFieldSlotsLength() + fieldStart, fieldLength))));
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
- buf[refIndex] = baos.toByteArray();
- return 4;
- }
-
- @Override
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
- int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
- + accessor.getFieldSlotsLength() + fieldStart);
-
- try {
- if (refIndex >= 0)
- tupleBuilder.getDataOutput().write(buf[refIndex]);
- else {
- int fieldLength = accessor.getFieldLength(tIndex, outField);
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4, fieldLength - 4);
- }
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
-
- @Override
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
- int fieldLength = accessor.getFieldLength(tIndex, outField);
- int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
- + accessor.getFieldSlotsLength() + fieldOffset);
-
- try {
- tupleBuilder.getDataOutput().writeInt(-1);
- if (refIndex < 0) {
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldOffset + 4, fieldLength - 4);
- } else {
- tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
- }
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException();
- }
- }
- };
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
deleted file mode 100644
index 74ac53a..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private int outField = -1;
-
- public CountAggregatorDescriptorFactory() {
- }
-
- public CountAggregatorDescriptorFactory(int outField) {
- this.outField = outField;
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
- if (this.outField < 0) {
- this.outField = keyFields.length;
- }
- return new IAggregatorDescriptor() {
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException {
- ByteBuffer buf = ByteBuffer.wrap(data);
- int count = buf.getInt(offset);
- buf.putInt(offset, count + 1);
- return 4;
- }
-
- @Override
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
- try {
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("Failed to write int sum as a partial result.");
- }
- }
-
- @Override
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
- try {
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("Failed to write int sum as a partial result.");
- }
- }
-
- @Override
- public void reset() {
-
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
deleted file mode 100644
index f1e54e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class CountAggregatorFactory implements IFieldValueResultingAggregatorFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
- private int count;
-
- @Override
- public void output(DataOutput resultAcceptor) throws HyracksDataException {
- try {
- resultAcceptor.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- count = 0;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- ++count;
- }
- };
- }
-
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
- private int count;
-
- @Override
- public void output(DataOutput resultAcceptor) throws HyracksDataException {
- try {
- resultAcceptor.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
- throws HyracksDataException {
- count = IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
-
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- count++;
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- count = 0;
- }
-
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
- throws HyracksDataException {
- count += IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
- };
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
deleted file mode 100644
index 515cc21..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-
-/**
- * SUM aggregator on float type data.
- *
- */
-public class FloatSumAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private int sumField;
-
- public FloatSumAggregatorFactory(int field) {
- this.sumField = field;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
-
- private float sum;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- }
- };
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createSpillableFieldValueResultingAggregator()
- */
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
-
- private float sum;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
-
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum += FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
deleted file mode 100644
index dfc31cd..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface IAggregatorDescriptor {
-
- /**
- * Initialize the aggregator with an input tuple specified by the input
- * frame and tuple index. This function will write the initialized partial
- * result into the tuple builder.
- *
- * @param accessor
- * @param tIndex
- * @param tupleBuilder
- * @throws HyracksDataException
- */
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException;
-
- /**
- * Aggregate the input tuple with the partial result specified by the bytes.
- * The new value then is written back to the bytes field specified.
- * It is the developer's responsibility to have the new result not exceed
- * the given bytes.
- *
- * @param accessor
- * @param tIndex
- * @param data
- * @param offset
- * @param length
- * @return
- * @throws HyracksDataException
- */
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException;
-
- /**
- * Output the partial aggregation result to an array tuple builder.
- * Necessary additional information for aggregation should be maintained.
- * For example, for an aggregator calculating AVG, the count and also the
- * current average should be maintained as the partial results.
- *
- * @param accessor
- * @param tIndex
- * @param tupleBuilder
- * @throws HyracksDataException
- */
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException;
-
- /**
- * Output the final aggregation result to an array tuple builder.
- *
- * @param accessor
- * @param tIndex
- * @param tupleBuilder
- * @return
- * @throws HyracksDataException
- */
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException;
-
- /**
- * reset the internal states
- */
- public void reset();
-
- /**
- * Close the aggregator. Necessary clean-up code should be implemented here.
- */
- public void close();
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
deleted file mode 100644
index 7f6928b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IAggregatorDescriptorFactory extends Serializable {
-
- /**
- * Create an aggregator.
- *
- * @param ctx
- * @param inRecordDescriptor
- * @param outRecordDescriptor
- * @param keyFields
- * @return
- * @throws HyracksDataException
- */
- IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
deleted file mode 100644
index 783467b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFieldValueResultingAggregator {
- /**
- * Called once per aggregator before calling accumulate for the first time.
- *
- * @param accessor
- * - Accessor to the data tuple.
- * @param tIndex
- * - Index of the tuple in the accessor.
- * @throws HyracksDataException
- */
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
- /**
- * Called once per tuple that belongs to this group.
- *
- * @param accessor
- * - Accessor to data tuple.
- * @param tIndex
- * - Index of tuple in the accessor.
- * @throws HyracksDataException
- */
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
- /**
- * Called finally to emit output.
- *
- * @param resultAcceptor
- * - Interface to write the result to.
- * @throws HyracksDataException
- */
- public void output(DataOutput resultAcceptor) throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
deleted file mode 100644
index 1b7da7f..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.Serializable;
-
-public interface IFieldValueResultingAggregatorFactory extends Serializable {
- public IFieldValueResultingAggregator createFieldValueResultingAggregator();
-
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator();
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
deleted file mode 100644
index d52f458..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An extended version of the {@link IFieldValueResultingAggregator} supporting
- * external aggregation.
- */
-public interface ISpillableFieldValueResultingAggregator extends IFieldValueResultingAggregator {
-
- /**
- * Called once per aggregator before calling accumulate for the first time.
- *
- * @param accessor
- * - Accessor to the data tuple.
- * @param tIndex
- * - Index of the tuple in the accessor.
- * @throws HyracksDataException
- */
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException;
-
- /**
- * Aggregate another partial result.
- *
- * @param accessor
- * @param tIndex
- * @param fIndex
- * @throws HyracksDataException
- */
- public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
- throws HyracksDataException;
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
deleted file mode 100644
index 770d36b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private final int aggField;
- private int outField = -1;
-
- public IntSumAggregatorDescriptorFactory(int aggField) {
- this.aggField = aggField;
- }
-
- public IntSumAggregatorDescriptorFactory(int aggField, int outField) {
- this.aggField = aggField;
- this.outField = outField;
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
- if (this.outField < 0) {
- this.outField = keyFields.length;
- }
-
- return new IAggregatorDescriptor() {
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int sum = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-
- tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException {
- int sum = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- // Update the value of tuple 2
- ByteBuffer buf = ByteBuffer.wrap(data);
- sum += buf.getInt(offset);
- buf.putInt(offset, sum);
- return 4;
- }
-
- @Override
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
- try {
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("Failed to write int sum as a partial result.");
- }
- }
-
- @Override
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
- try {
- tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("Failed to write int sum as a partial result.");
- }
- }
-
- @Override
- public void reset() {
-
- }
- };
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
deleted file mode 100644
index 0db3511..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-
-/**
- * Min/Max aggregator factory
- */
-public class MinMaxAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * indicate the type of the value: true: max false: min
- */
- private boolean type;
-
- /**
- * The field to be aggregated.
- */
- private int field;
-
- public MinMaxAggregatorFactory(boolean type, int field) {
- this.type = type;
- this.field = field;
- }
-
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
-
- private float minmax;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(minmax);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- minmax = 0;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, field);
- float nval = FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- if ((type ? (nval > minmax) : (nval < minmax))) {
- minmax = nval;
- }
- }
- };
- }
-
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
-
- private float minmax;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(minmax);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- minmax = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
-
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- minmax = 0;
- }
-
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- minmax = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
-
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, field);
- float nval = FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- if ((type ? (nval > minmax) : (nval < minmax))) {
- minmax = nval;
- }
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
deleted file mode 100644
index 053b9e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private final IAggregatorDescriptorFactory[] aggregatorFactories;
-
- public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
- this.aggregatorFactories = aggregatorFactories;
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx,
- final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
- throws HyracksDataException {
-
- final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
- keyFields);
- }
-
- return new IAggregatorDescriptor() {
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].init(accessor, tIndex, tupleBuilder);
- }
- }
-
- @Override
- public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
- throws HyracksDataException {
- int adjust = 0;
- for (int i = 0; i < aggregators.length; i++) {
- adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
- }
- return adjust;
- }
-
- @Override
- public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
- }
- }
-
- @Override
- public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
- }
- }
-
- @Override
- public void close() {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].close();
- }
- }
-
- @Override
- public void reset() {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].reset();
- }
- }
-
- };
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
deleted file mode 100644
index 8237110..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-
-public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
- private static final long serialVersionUID = 1L;
-
- private IFieldValueResultingAggregatorFactory[] aFactories;
-
- public MultiAggregatorFactory(IFieldValueResultingAggregatorFactory[] aFactories) {
- this.aFactories = aFactories;
- }
-
- @Override
- public IAccumulatingAggregator createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException {
- final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
- for (int i = 0; i < aFactories.length; ++i) {
- aggregators[i] = aFactories[i].createFieldValueResultingAggregator();
- }
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
- return new IAccumulatingAggregator() {
-
- private boolean pending;
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].init(accessor, tIndex);
- }
- pending = false;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].accumulate(accessor, tIndex);
- }
- }
-
- @Override
- public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
- int[] keyFieldIndexes) throws HyracksDataException {
- if (!pending) {
- tb.reset();
- for (int i = 0; i < keyFieldIndexes.length; ++i) {
- tb.addField(accessor, tIndex, keyFieldIndexes[i]);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tb.addFieldEndOffset();
- }
- }
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
-
- };
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
deleted file mode 100644
index 0817263..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-/**
- * SUM aggregator factory (for integer only; another SUM aggregator for floats
- * is available at {@link FloatSumAggregatorFactory})
- */
-public class SumAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
-
- private int sumField;
-
- public SumAggregatorFactory(int field) {
- sumField = field;
- }
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
- private int sum;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeInt(sum);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- }
- };
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
- * ISpillableFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
-
- private int sum;
-
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeInt(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum = IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
-
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum += IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 4cda69e..275bc56 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.io.Serializable;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 4ac72f6..07da19b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -90,11 +90,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
if (index != 0) {
throw new IllegalArgumentException();
@@ -121,4 +116,10 @@
}
return 0;
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index b829f5b..7bef7a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -39,14 +39,11 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
@@ -55,7 +52,11 @@
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+/**
+ *
+ */
public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
private static final int AGGREGATE_ACTIVITY_ID = 0;
private static final int MERGE_ACTIVITY_ID = 1;
@@ -64,16 +65,22 @@
private final int[] keyFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
private final IAggregatorDescriptorFactory aggregatorFactory;
- private final IAggregatorDescriptorFactory mergeFactory;
+ private final IAggregatorDescriptorFactory mergerFactory;
+
private final int framesLimit;
private final ISpillableTableFactory spillableTableFactory;
private final boolean isOutputSorted;
- public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergeFactory,
- RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+ public ExternalGroupOperatorDescriptor(JobSpecification spec,
+ int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor recordDescriptor,
+ ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
super(spec, 1, 1);
this.framesLimit = framesLimit;
if (framesLimit <= 1) {
@@ -81,11 +88,12 @@
* Minimum of 2 frames: 1 for input records, and 1 for output
* aggregation results.
*/
- throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+ throw new IllegalStateException(
+ "frame limit should at least be 2, but it is "
+ + framesLimit + "!");
}
-
this.aggregatorFactory = aggregatorFactory;
- this.mergeFactory = mergeFactory;
+ this.mergerFactory = mergerFactory;
this.keyFields = keyFields;
this.comparatorFactories = comparatorFactories;
this.firstNormalizerFactory = firstNormalizerFactory;
@@ -93,17 +101,25 @@
this.isOutputSorted = isOutputSorted;
/**
- * Set the record descriptor. Note that since
- * this operator is a unary operator,
- * only the first record descriptor is used here.
+ * Set the record descriptor. Note that since this operator is a unary
+ * operator, only the first record descriptor is used here.
*/
recordDescriptors[0] = recordDescriptor;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+ * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+ */
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
- MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+ AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
+ getOperatorId(), AGGREGATE_ACTIVITY_ID));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
+ MERGE_ACTIVITY_ID));
builder.addActivity(aggregateAct);
builder.addSourceEdge(0, aggregateAct, 0);
@@ -145,28 +161,35 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions)
throws HyracksDataException {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private AggregateActivityState state;
@Override
public void open() throws HyracksDataException {
- state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
+ state = new AggregateActivityState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
state.runs = new LinkedList<RunFileReader>();
- state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+ state.gTable = spillableTableFactory.buildSpillableTable(
+ ctx, keyFields, comparatorFactories,
firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0],
ExternalGroupOperatorDescriptor.this.framesLimit);
state.gTable.reset();
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
@@ -206,12 +229,15 @@
private void flushFramesToRun() throws HyracksDataException {
FileReference runFile;
try {
- runFile = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class.getSimpleName());
+ runFile = ctx.getJobletContext()
+ .createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class
+ .getSimpleName());
} catch (IOException e) {
throw new HyracksDataException(e);
}
- RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+ RunFileWriter writer = new RunFileWriter(runFile,
+ ctx.getIOManager());
writer.open();
try {
state.gTable.sortFrames();
@@ -237,15 +263,28 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions)
throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
+ comparators[i] = comparatorFactories[i]
+ .createBinaryComparator();
}
- final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
- recordDescriptors[0], recordDescriptors[0], keyFields);
+
+ int[] keyFieldsInPartialResults = new int[keyFields.length];
+ for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ keyFieldsInPartialResults[i] = i;
+ }
+
+ final IAggregatorDescriptor aggregator = mergerFactory
+ .createAggregator(ctx, recordDescriptors[0],
+ recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+ final AggregateState aggregateState = aggregator
+ .createAggregateStates();
+
final int[] storedKeys = new int[keyFields.length];
/**
* Get the list of the fields in the stored records.
@@ -253,10 +292,6 @@
for (int i = 0; i < keyFields.length; ++i) {
storedKeys[i] = i;
}
- /**
- * Tuple builder
- */
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
/**
@@ -280,15 +315,17 @@
private int[] currentFrameIndexInRun;
private int[] currentRunFrames;
- private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptors[0]);
- private ArrayTupleBuilder finalTupleBuilder;
+ private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), recordDescriptors[0]);
private FrameTupleAppender writerFrameAppender;
public void initialize() throws HyracksDataException {
- aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
- AGGREGATE_ACTIVITY_ID), partition));
+ aggState = (AggregateActivityState) ctx
+ .getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), AGGREGATE_ACTIVITY_ID),
+ partition));
runs = aggState.runs;
writer.open();
try {
@@ -318,11 +355,13 @@
writer.fail();
throw new HyracksDataException(e);
} finally {
+ aggregateState.close();
writer.close();
}
}
- private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+ private void doPass(LinkedList<RunFileReader> runs)
+ throws HyracksDataException {
FileReference newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
@@ -337,8 +376,10 @@
runNumber = runs.size();
} else {
runNumber = framesLimit - 2;
- newRun = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class.getSimpleName());
+ newRun = ctx.getJobletContext()
+ .createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class
+ .getSimpleName());
writer = new RunFileWriter(newRun, ctx.getIOManager());
writer.open();
}
@@ -346,14 +387,16 @@
currentFrameIndexInRun = new int[runNumber];
currentRunFrames = new int[runNumber];
/**
- * Create file readers for each input run file, only
- * for the ones fit into the inFrames
+ * Create file readers for each input run file, only for
+ * the ones fit into the inFrames
*/
RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
+ .size()];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
- recordDescriptors[0], runNumber, comparator);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
+ ctx.getFrameSize(), recordDescriptors[0],
+ runNumber, comparator);
/**
* current tuple index in each run
*/
@@ -366,18 +409,25 @@
runFileReaders[runIndex].open();
currentRunFrames[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+ currentFrameIndexInRun[runIndex] = runIndex
+ * runFrameLimit;
for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex] + j;
- if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+ int frameIndex = currentFrameIndexInRun[runIndex]
+ + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames
+ .get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(
+ ctx.getFrameSize(),
recordDescriptors[0]);
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ tupleAccessors[frameIndex].reset(inFrames
+ .get(frameIndex));
currentRunFrames[runIndex]++;
if (j == 0)
- setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
+ setNextTopTuple(runIndex, tupleIndices,
+ runFileReaders, tupleAccessors,
topTuples);
} else {
+ closeRun(runIndex, runFileReaders, tupleAccessors);
break;
}
}
@@ -395,52 +445,49 @@
int runIndex = topTuples.peek().getRunid();
FrameTupleAccessor fta = top.getAccessor();
- int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ int currentTupleInOutFrame = outFrameAccessor
+ .getTupleCount() - 1;
if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+ || compareFrameTuples(fta, tupleIndex,
+ outFrameAccessor,
+ currentTupleInOutFrame) != 0) {
/**
- * If a new group comes
- * Initialize the first output record
- * Reset the tuple builder
+ * Initialize the first output record Reset the
+ * tuple builder
*/
- tupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tupleBuilder.addField(fta, tupleIndex, i);
- }
-
- currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
- if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ if (!aggregator.initFromPartial(outFrameAppender, fta,
+ tupleIndex, aggregateState)) {
flushOutFrame(writer, finalPass);
- if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+ if (!aggregator.initFromPartial(outFrameAppender, fta,
+ tupleIndex, aggregateState)) {
throw new HyracksDataException(
"Failed to append an aggregation result to the output frame.");
+ }
}
+
} else {
/**
* if new tuple is in the same group of the
- * current aggregator
- * do merge and output to the outFrame
+ * current aggregator do merge and output to the
+ * outFrame
*/
- int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
- int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
- keyFields.length);
- int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
- keyFields.length);
- currentWorkingAggregator.aggregate(fta, tupleIndex, outFrameAccessor.getBuffer()
- .array(), tupleOffset + outFrameAccessor.getFieldSlotsLength() + fieldOffset,
- fieldLength);
+
+ aggregator.aggregate(fta, tupleIndex,
+ outFrameAccessor,
+ currentTupleInOutFrame, aggregateState);
+
}
tupleIndices[runIndex]++;
- setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ setNextTopTuple(runIndex, tupleIndices,
+ runFileReaders, tupleAccessors, topTuples);
}
if (outFrameAppender.getTupleCount() > 0) {
flushOutFrame(writer, finalPass);
}
- currentWorkingAggregator.close();
+ aggregator.close();
+
runs.subList(0, runNumber).clear();
/**
* insert the new run file into the beginning of the run
@@ -456,36 +503,38 @@
}
}
- private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
- if (finalTupleBuilder == null) {
- finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
- }
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal)
+ throws HyracksDataException {
if (writerFrame == null) {
writerFrame = ctx.allocateFrame();
}
if (writerFrameAppender == null) {
- writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ writerFrameAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
writerFrameAppender.reset(writerFrame, true);
}
outFrameAccessor.reset(outFrame);
- for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
- finalTupleBuilder.reset();
- for (int j = 0; j < keyFields.length; j++) {
- finalTupleBuilder.addField(outFrameAccessor, i, j);
- }
- if (isFinal)
- currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
- else
- currentWorkingAggregator.outputPartialResult(outFrameAccessor, i, finalTupleBuilder);
- if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
- throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+ if(isFinal){
+ if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
+ } else {
+ if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
}
}
if (writerFrameAppender.getTupleCount() > 0) {
@@ -495,12 +544,15 @@
outFrameAppender.reset(outFrame, true);
}
- private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+ private void setNextTopTuple(int runIndex, int[] tupleIndices,
+ RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors,
+ ReferencedPriorityQueue topTuples)
throws HyracksDataException {
int runStart = runIndex * runFrameLimit;
boolean existNext = false;
- if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
+ || runCursors[runIndex] == null) {
/**
* run already closed
*/
@@ -510,7 +562,8 @@
* not the last frame for this run
*/
existNext = true;
- if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
+ .getTupleCount()) {
tupleIndices[runIndex] = 0;
currentFrameIndexInRun[runIndex]++;
}
@@ -533,13 +586,15 @@
*/
currentRunFrames[runIndex] = 0;
for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
- ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
+ ByteBuffer buffer = tupleAccessors[frameOffset]
+ .getBuffer();
if (runCursors[runIndex].nextFrame(buffer)) {
tupleAccessors[frameOffset].reset(buffer);
if (tupleAccessors[frameOffset].getTupleCount() > 0) {
existNext = true;
} else {
- throw new IllegalStateException("illegal: empty run file");
+ throw new IllegalStateException(
+ "illegal: empty run file");
}
currentRunFrames[runIndex]++;
} else {
@@ -549,8 +604,10 @@
}
if (existNext) {
- topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
- tupleIndices[runIndex]);
+ topTuples
+ .popAndReplace(
+ tupleAccessors[currentFrameIndexInRun[runIndex]],
+ tupleIndices[runIndex]);
} else {
topTuples.pop();
closeRun(runIndex, runCursors, tupleAccessors);
@@ -566,23 +623,28 @@
* @param tupleAccessor
* @throws HyracksDataException
*/
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ private void closeRun(int index, RunFileReader[] runCursors,
+ IFrameTupleAccessor[] tupleAccessor)
throws HyracksDataException {
if (runCursors[index] != null) {
runCursors[index].close();
runCursors[index] = null;
+ tupleAccessor[index] = null;
}
}
- private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ private int compareFrameTuples(IFrameTupleAccessor fta1,
+ int j1, IFrameTupleAccessor fta2, int j2) {
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ int s1 = fta1.getTupleStartOffset(j1)
+ + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ int s2 = fta2.getTupleStartOffset(j2)
+ + fta2.getFieldSlotsLength()
+ fta2.getFieldStartOffset(j2, fIdx);
int l2_start = fta2.getFieldStartOffset(j2, fIdx);
int l2_end = fta2.getFieldEndOffset(j2, fIdx);
@@ -598,25 +660,32 @@
return op;
}
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ private Comparator<ReferenceEntry> createEntryComparator(
+ final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
@Override
public int compare(ReferenceEntry o1, ReferenceEntry o2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
+ .getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
+ .getAccessor();
int j1 = o1.getTupleIndex();
int j2 = o2.getTupleIndex();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ int s1 = fta1.getTupleStartOffset(j1)
+ + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ int l1 = fta1.getFieldEndOffset(j1, fIdx)
+ - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2)
+ + fta2.getFieldSlotsLength()
+ fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx)
+ - fta2.getFieldStartOffset(j2, fIdx);
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index 5733c11..b1af426 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -34,7 +34,8 @@
class GroupingHashTable {
/**
- * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ * The pointers in the link store 3 int values for each entry in the
+ * hashtable: (bufferIdx, tIndex, accumulatorIdx).
*
* @author vinayakb
*/
@@ -59,53 +60,69 @@
}
}
- private static final int INIT_ACCUMULATORS_SIZE = 8;
+ private static final int INIT_AGG_STATE_SIZE = 8;
private final IHyracksTaskContext ctx;
private final FrameTupleAppender appender;
private final List<ByteBuffer> buffers;
private final Link[] table;
- private IAccumulatingAggregator[] accumulators;
+ /**
+ * Aggregate states: a list of states for all groups maintained in the main
+ * memory.
+ */
+ private AggregateState[] aggregateStates;
private int accumulatorSize;
private int lastBIndex;
- private final int[] fields;
private final int[] storedKeys;
private final IBinaryComparator[] comparators;
private final FrameTuplePairComparator ftpc;
private final ITuplePartitionComputer tpc;
- private final IAccumulatingAggregatorFactory aggregatorFactory;
- private final RecordDescriptor inRecordDescriptor;
- private final RecordDescriptor outRecordDescriptor;
+ private final IAggregatorDescriptor aggregator;
private final FrameTupleAccessor storedKeysAccessor;
- GroupingHashTable(IHyracksTaskContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+ GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ ITuplePartitionComputerFactory tpcf,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int tableSize)
+ throws HyracksDataException {
this.ctx = ctx;
appender = new FrameTupleAppender(ctx.getFrameSize());
buffers = new ArrayList<ByteBuffer>();
table = new Link[tableSize];
- accumulators = new IAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
- accumulatorSize = 0;
- this.fields = fields;
+
storedKeys = new int[fields.length];
+ @SuppressWarnings("rawtypes")
ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
for (int i = 0; i < fields.length; ++i) {
storedKeys[i] = i;
storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
}
+
comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
tpc = tpcf.createPartitioner();
- this.aggregatorFactory = aggregatorFactory;
- this.inRecordDescriptor = inRecordDescriptor;
- this.outRecordDescriptor = outRecordDescriptor;
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
- storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+
+ int[] keyFieldsInPartialResults = new int[fields.length];
+ for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ keyFieldsInPartialResults[i] = i;
+ }
+
+ this.aggregator = aggregatorFactory.createAggregator(ctx,
+ inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
+
+ this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+ accumulatorSize = 0;
+
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+ storedKeySerDeser);
+ storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ storedKeysRecordDescriptor);
lastBIndex = -1;
addNewBuffer();
}
@@ -119,7 +136,8 @@
++lastBIndex;
}
- private void flushFrame(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+ private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+ throws HyracksDataException {
ByteBuffer frame = appender.getBuffer();
frame.position(0);
frame.limit(frame.capacity());
@@ -127,44 +145,56 @@
appender.reset(appender.getBuffer(), true);
}
- void insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
int entry = tpc.partition(accessor, tIndex, table.length);
Link link = table[entry];
if (link == null) {
link = table[entry] = new Link();
}
- IAccumulatingAggregator aggregator = null;
+ int saIndex = -1;
for (int i = 0; i < link.size; i += 3) {
int sbIndex = link.pointers[i];
int stIndex = link.pointers[i + 1];
- int saIndex = link.pointers[i + 2];
storedKeysAccessor.reset(buffers.get(sbIndex));
int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
if (c == 0) {
- aggregator = accumulators[saIndex];
+ saIndex = link.pointers[i + 2];
break;
}
}
- if (aggregator == null) {
+ if (saIndex < 0) {
// Did not find the key. Insert a new entry.
- if (!appender.appendProjection(accessor, tIndex, fields)) {
+ saIndex = accumulatorSize++;
+ // Add keys
+
+ // Add index to the keys in frame
+ int sbIndex = lastBIndex;
+ int stIndex = appender.getTupleCount();
+
+ // Add aggregation fields
+ AggregateState newState = aggregator.createAggregateStates();
+
+ if(!aggregator.init(appender, accessor, tIndex, newState)){
addNewBuffer();
- if (!appender.appendProjection(accessor, tIndex, fields)) {
+ sbIndex = lastBIndex;
+ stIndex = appender.getTupleCount();
+ if(!aggregator.init(appender, accessor, tIndex, newState)){
throw new IllegalStateException();
}
}
- int sbIndex = lastBIndex;
- int stIndex = appender.getTupleCount() - 1;
- if (accumulatorSize >= accumulators.length) {
- accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
+
+ if (accumulatorSize >= aggregateStates.length) {
+ aggregateStates = Arrays.copyOf(aggregateStates,
+ aggregateStates.length * 2);
}
- int saIndex = accumulatorSize++;
- aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor);
- aggregator.init(accessor, tIndex);
+
+ aggregateStates[saIndex] = newState;
+
link.add(sbIndex, stIndex, saIndex);
+ } else {
+ aggregator.aggregate(accessor, tIndex, null, 0,
+ aggregateStates[saIndex]);
}
- aggregator.accumulate(accessor, tIndex);
}
void write(IFrameWriter writer) throws HyracksDataException {
@@ -179,8 +209,10 @@
int aIndex = link.pointers[j + 2];
ByteBuffer keyBuffer = buffers.get(bIndex);
storedKeysAccessor.reset(keyBuffer);
- IAccumulatingAggregator aggregator = accumulators[aIndex];
- while (!aggregator.output(appender, storedKeysAccessor, tIndex, storedKeys)) {
+
+ while (!aggregator
+ .outputFinalResult(appender, storedKeysAccessor,
+ tIndex, aggregateStates[aIndex])) {
flushFrame(appender, writer);
}
}
@@ -190,4 +222,10 @@
flushFrame(appender, writer);
}
}
+
+ void close() throws HyracksDataException {
+ for(AggregateState aState : aggregateStates){
+ aState.close();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index d0cd895..c089dd4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -38,7 +38,11 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+/**
+ *
+ */
public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
private static final int HASH_BUILD_ACTIVITY_ID = 0;
private static final int OUTPUT_ACTIVITY_ID = 1;
@@ -48,27 +52,40 @@
private final int[] keys;
private final ITuplePartitionComputerFactory tpcf;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final IAccumulatingAggregatorFactory aggregatorFactory;
+
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+
private final int tableSize;
- public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys, ITuplePartitionComputerFactory tpcf,
- IBinaryComparatorFactory[] comparatorFactories, IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, int tableSize) {
+ public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
+ ITuplePartitionComputerFactory tpcf,
+ IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor outRecordDescriptor, int tableSize) {
super(spec, 1, 1);
this.keys = keys;
this.tpcf = tpcf;
this.comparatorFactories = comparatorFactories;
this.aggregatorFactory = aggregatorFactory;
- recordDescriptors[0] = recordDescriptor;
+ recordDescriptors[0] = outRecordDescriptor;
this.tableSize = tableSize;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+ * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+ */
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
+ HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
+ HASH_BUILD_ACTIVITY_ID));
builder.addActivity(ha);
- OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
+ OutputActivity oa = new OutputActivity(new ActivityId(odId,
+ OUTPUT_ACTIVITY_ID));
builder.addActivity(oa);
builder.addSourceEdge(0, ha, 0);
@@ -105,28 +122,40 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider,
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
final int partition, int nPartitions) {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
return new AbstractUnaryInputSinkOperatorNodePushable() {
private HashBuildActivityState state;
@Override
public void open() throws HyracksDataException {
- state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
- state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ state = new HashBuildActivityState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
+ state.table = new GroupingHashTable(ctx, keys,
+ comparatorFactories, tpcf, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0],
tableSize);
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
- state.table.insert(accessor, i);
+ try {
+ state.table.insert(accessor, i);
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ throw new HyracksDataException(e);
+ }
}
}
@@ -137,6 +166,8 @@
@Override
public void fail() throws HyracksDataException {
+ throw new HyracksDataException(
+ "HashGroupOperator is failed.");
}
};
}
@@ -150,13 +181,17 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
final int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
- new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
+ HashBuildActivityState buildState = (HashBuildActivityState) ctx
+ .getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), HASH_BUILD_ACTIVITY_ID),
+ partition));
GroupingHashTable table = buildState.table;
writer.open();
try {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
deleted file mode 100644
index b20a93d..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
- private static final long serialVersionUID = 1L;
- private final ITuplePartitionComputerFactory tpcf;
- private final int tableSize;
-
- public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
- this.tpcf = tpcf;
- this.tableSize = tableSize;
- }
-
- @Override
- public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
- final IBinaryComparatorFactory[] comparatorFactories,
- final INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- final IAggregatorDescriptorFactory aggregateDescriptorFactory, final RecordDescriptor inRecordDescriptor,
- final RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
- final int[] storedKeys = new int[keyFields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
- for (int i = 0; i < keyFields.length; i++) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
- }
-
- RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
- final FrameTupleAccessor storedKeysAccessor1;
- final FrameTupleAccessor storedKeysAccessor2;
- if (keyFields.length >= outRecordDescriptor.getFields().length) {
- // for the case of zero-aggregations
- ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
- ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
- ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
- for (int i = 0; i < fields.length; i++)
- newFields[i] = fields[i];
- ITypeTrait[] newTypes = null;
- if (types != null) {
- newTypes = new ITypeTrait[types.length + 1];
- for (int i = 0; i < types.length; i++)
- newTypes[i] = types[i];
- }
- internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
- }
- storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
- storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
-
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
- final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
- final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- final ITuplePartitionComputer tpc = tpcf.createPartitioner();
- final ByteBuffer outFrame = ctx.allocateFrame();
-
- final ArrayTupleBuilder internalTupleBuilder;
- if (keyFields.length < outRecordDescriptor.getFields().length)
- internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
- else
- internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
- final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
- final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
- .createNormalizedKeyComputer();
-
- return new ISpillableTable() {
- private int dataFrameCount;
- private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
- private final TuplePointer storedTuplePointer = new TuplePointer();
- private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
- private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor, keyFields);
-
- /**
- * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
- * = Frame index in the "Frames" list, [1] = Tuple index in the
- * frame, [2] = Poor man's normalized key for the tuple.
- */
- private int[] tPointers;
-
- @Override
- public void reset() {
- dataFrameCount = -1;
- tPointers = null;
- table.reset();
- aggregator.close();
- }
-
- @Override
- public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- if (dataFrameCount < 0)
- nextAvailableFrame();
- int entry = tpc.partition(accessor, tIndex, tableSize);
- boolean foundGroup = false;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset++, storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
- int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
- if (c == 0) {
- foundGroup = true;
- break;
- }
- } while (true);
-
- if (!foundGroup) {
- /**
- * If no matching group is found, create a new aggregator
- * Create a tuple for the new group
- */
- internalTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
- internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
- }
- aggregator.init(accessor, tIndex, internalTupleBuilder);
- if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- if (!nextAvailableFrame()) {
- return false;
- } else {
- if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- throw new IllegalStateException("Failed to init an aggregator");
- }
- }
- }
-
- storedTuplePointer.frameIndex = dataFrameCount;
- storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
- table.insert(entry, storedTuplePointer);
- } else {
- // If there is a matching found, do aggregation directly
- int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
- int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(storedTuplePointer.tupleIndex,
- keyFields.length);
- int tupleLength = storedKeysAccessor1.getFieldLength(storedTuplePointer.tupleIndex,
- keyFields.length);
- aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset
- + storedKeysAccessor1.getFieldSlotsLength() + aggFieldOffset, tupleLength);
- }
- return true;
- }
-
- @Override
- public List<ByteBuffer> getFrames() {
- return frames;
- }
-
- @Override
- public int getFrameCount() {
- return dataFrameCount;
- }
-
- @Override
- public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- writer.open();
- appender.reset(outFrame, true);
- if (tPointers == null) {
- // Not sorted
- for (int i = 0; i < tableSize; ++i) {
- int entry = i;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset++, storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- int bIndex = storedTuplePointer.frameIndex;
- int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(bIndex));
- // Reset the tuple for the partial result
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(storedKeysAccessor1, tIndex, k);
- }
- if (isPartial)
- aggregator.outputPartialResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
- else
- aggregator.outputResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
- while (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- }
- } while (true);
- }
- if (appender.getTupleCount() != 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- aggregator.close();
- return;
- }
- int n = tPointers.length / 3;
- for (int ptr = 0; ptr < n; ptr++) {
- int tableIndex = tPointers[ptr * 3];
- int rowIndex = tPointers[ptr * 3 + 1];
- table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
- int frameIndex = storedTuplePointer.frameIndex;
- int tupleIndex = storedTuplePointer.tupleIndex;
- // Get the frame containing the value
- ByteBuffer buffer = frames.get(frameIndex);
- storedKeysAccessor1.reset(buffer);
-
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
- }
- if (isPartial)
- aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
- else
- aggregator.outputResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
- if (!appender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- aggregator.close();
- }
-
- /**
- * Set the working frame to the next available frame in the frame
- * list. There are two cases:<br>
- * 1) If the next frame is not initialized, allocate a new frame. 2)
- * When frames are already created, they are recycled.
- *
- * @return Whether a new frame is added successfully.
- */
- private boolean nextAvailableFrame() {
- // Return false if the number of frames is equal to the limit.
- if (dataFrameCount + 1 >= framesLimit)
- return false;
-
- if (frames.size() < framesLimit) {
- // Insert a new frame
- ByteBuffer frame = ctx.allocateFrame();
- frame.position(0);
- frame.limit(frame.capacity());
- frames.add(frame);
- appender.reset(frame, true);
- dataFrameCount = frames.size() - 1;
- } else {
- // Reuse an old frame
- dataFrameCount++;
- ByteBuffer frame = frames.get(dataFrameCount);
- frame.position(0);
- frame.limit(frame.capacity());
- appender.reset(frame, true);
- }
- return true;
- }
-
- @Override
- public void sortFrames() {
- int sfIdx = storedKeys[0];
- int totalTCount = table.getTupleCount();
- tPointers = new int[totalTCount * 3];
- int ptr = 0;
-
- for (int i = 0; i < tableSize; i++) {
- int entry = i;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset, storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- tPointers[ptr * 3] = entry;
- tPointers[ptr * 3 + 1] = offset;
- table.getTuplePointer(entry, offset, storedTuplePointer);
- int fIndex = storedTuplePointer.frameIndex;
- int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(fIndex));
- int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
- int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
- int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
- int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
- tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
- .array(), f0Start, f0EndRel - f0StartRel);
- ptr++;
- offset++;
- } while (true);
- }
- /**
- * Sort using quick sort
- */
- if (tPointers.length > 0) {
- sort(tPointers, 0, totalTCount);
- }
- }
-
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- int mTable = tPointers[m * 3];
- int mRow = tPointers[m * 3 + 1];
- int mNormKey = tPointers[m * 3 + 2];
-
- table.getTuplePointer(mTable, mRow, storedTuplePointer);
- int mFrame = storedTuplePointer.frameIndex;
- int mTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(mFrame));
-
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int bTable = tPointers[b * 3];
- int bRow = tPointers[b * 3 + 1];
- int bNormKey = tPointers[b * 3 + 2];
- int cmp = 0;
- if (bNormKey != mNormKey) {
- cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
- } else {
- table.getTuplePointer(bTable, bRow, storedTuplePointer);
- int bFrame = storedTuplePointer.frameIndex;
- int bTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(bFrame));
- cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
- }
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cTable = tPointers[c * 3];
- int cRow = tPointers[c * 3 + 1];
- int cNormKey = tPointers[c * 3 + 2];
- int cmp = 0;
- if (cNormKey != mNormKey) {
- cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
- } else {
- table.getTuplePointer(cTable, cRow, storedTuplePointer);
- int cFrame = storedTuplePointer.frameIndex;
- int cTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(cFrame));
- cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
- }
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(tPointers, b++, c--);
- }
-
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
-
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
- }
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
- }
- }
-
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 3; ++i) {
- int t = x[a * 3 + i];
- x[a * 3 + i] = x[b * 3 + i];
- x[b * 3 + i] = t;
- }
- }
-
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
-
- @Override
- public void close() {
- dataFrameCount = -1;
- tPointers = null;
- table.close();
- frames.clear();
- }
- };
- }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
similarity index 99%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index b20197d..af68afb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.nio.ByteBuffer;
import java.util.ArrayList;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
deleted file mode 100644
index b0da898..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public interface IAccumulatingAggregator {
- /**
- * Called once per aggregator before calling accumulate for the first time.
- *
- * @param accessor
- * - Accessor to the data tuple.
- * @param tIndex
- * - Index of the tuple in the accessor.
- * @throws HyracksDataException
- */
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
- /**
- * Called once per tuple that belongs to this group.
- *
- * @param accessor
- * - Accessor to data tuple.
- * @param tIndex
- * - Index of tuple in the accessor.
- * @throws HyracksDataException
- */
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
- /**
- * Called finally to emit output. This method is called until it returns true. The method is free to
- * write out output to the provided appender until there is no more space and return false. It is the
- * caller's responsibility to flush and make room in the appender before this method is called again.
- *
- * @param appender
- * - Appender to write output to.
- * @param accessor
- * - Accessor to access the key.
- * @param tIndex
- * - Tuple index of the key in the accessor.
- * @param keyFieldIndexes
- * - Field indexes of the key field.
- * @return true if all output is written, false if the appender is full.
- * @throws HyracksDataException
- */
- public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
deleted file mode 100644
index 978f671..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IAccumulatingAggregatorFactory extends Serializable {
- IAccumulatingAggregator createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
index 3851f26..858c760 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.io.Serializable;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
similarity index 98%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index dc2a30e..4167de1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a0802c8..339c29f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.io.Serializable;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
similarity index 98%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index c4df536..c42d29a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.io.DataOutput;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
index 0e2d530..ee35c5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
import java.io.Serializable;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 7bd8cd8..ff80a23 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -21,9 +21,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-/**
- * @author jarodwen
- */
public interface ISpillableTable {
public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 0ff1d1d..de9fac5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -21,15 +21,11 @@
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-/**
- * @author jarodwen
- */
public interface ISpillableTableFactory extends Serializable {
ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+ IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index c678ac1..8ff4942 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -29,15 +29,17 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class PreclusteredGroupOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
private final int[] groupFields;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final IAccumulatingAggregatorFactory aggregatorFactory;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
private static final long serialVersionUID = 1L;
- public PreclusteredGroupOperatorDescriptor(JobSpecification spec, int[] groupFields,
- IBinaryComparatorFactory[] comparatorFactories, IAccumulatingAggregatorFactory aggregatorFactory,
+ public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
+ int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.groupFields = groupFields;
@@ -47,32 +49,40 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) throws HyracksDataException {
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final IAccumulatingAggregator aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
- recordDescriptors[0]);
+ final RecordDescriptor inRecordDesc = recordDescProvider
+ .getInputRecordDescriptor(getOperatorId(), 0);
+ final IAggregatorDescriptor aggregator = aggregatorFactory
+ .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
+ groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
- final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor.reset(copyFrame);
ByteBuffer outFrame = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ final FrameTupleAppender appender = new FrameTupleAppender(
+ ctx.getFrameSize());
appender.reset(outFrame, true);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private PreclusteredGroupWriter pgw;
@Override
public void open() throws HyracksDataException {
- pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc, writer);
+ pgw = new PreclusteredGroupWriter(ctx, groupFields,
+ comparators, aggregator, inRecordDesc, writer);
pgw.open();
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
pgw.nextFrame(buffer);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index df7f9f9..3f2efaf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -28,7 +28,8 @@
public class PreclusteredGroupWriter implements IFrameWriter {
private final int[] groupFields;
private final IBinaryComparator[] comparators;
- private final IAccumulatingAggregator aggregator;
+ private final IAggregatorDescriptor aggregator;
+ private final AggregateState aggregateState;
private final IFrameWriter writer;
private final ByteBuffer copyFrame;
private final FrameTupleAccessor inFrameAccessor;
@@ -38,10 +39,11 @@
private boolean first;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAccumulatingAggregator aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+ IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
this.groupFields = groupFields;
this.comparators = comparators;
this.aggregator = aggregator;
+ this.aggregateState = aggregator.createAggregateStates();
this.writer = writer;
copyFrame = ctx.allocateFrame();
inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
@@ -64,7 +66,7 @@
int nTuples = inFrameAccessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
if (first) {
- aggregator.init(inFrameAccessor, i);
+ aggregator.init(null, inFrameAccessor, i, aggregateState);
first = false;
} else {
if (i == 0) {
@@ -72,8 +74,8 @@
} else {
switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
}
+
}
- aggregator.accumulate(inFrameAccessor, i);
}
FrameUtils.copy(buffer, copyFrame);
}
@@ -82,16 +84,18 @@
FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
writeOutput(prevTupleAccessor, prevTupleIndex);
- aggregator.init(currTupleAccessor, currTupleIndex);
+ aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+ } else {
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
}
}
private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
throws HyracksDataException {
- if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
FrameUtils.flushFrame(appender.getBuffer(), writer);
appender.reset(appender.getBuffer(), true);
- if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
throw new IllegalStateException();
}
}
@@ -124,6 +128,7 @@
FrameUtils.flushFrame(appender.getBuffer(), writer);
}
}
+ aggregateState.close();
writer.close();
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
index 3ecb968..d2d1b03 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.DataOutput;
import java.io.IOException;
@@ -23,10 +23,10 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
similarity index 93%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index a56c669..657af3b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.DataOutput;
import java.io.IOException;
@@ -23,10 +23,10 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 0695c5a..6045687 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.DataOutput;
import java.io.IOException;
@@ -23,10 +23,10 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
similarity index 96%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 1ae7aa4..3970e57 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -26,10 +26,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 0ae62e5..11d236c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.DataOutput;
@@ -22,12 +22,12 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index d8df3a8..753cf0c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -38,16 +38,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -58,6 +48,16 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
/**
*
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index c0a1d10..1c96755 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -35,9 +35,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -47,7 +44,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -78,7 +78,7 @@
spec,
new int[] { 0 },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID);
@@ -89,8 +89,8 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -147,7 +147,7 @@
spec,
new int[] { 0 },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
@@ -158,8 +158,8 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -216,7 +216,7 @@
spec,
new int[] { 0 },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
desc2);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
@@ -227,8 +227,8 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
deleted file mode 100644
index a134a39..0000000
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.tests.spillable;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.AvgAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.ConcatAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
-/**
- *
- */
-public class ExternalAggregateTest extends AbstractIntegrationTest {
-
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) });
-
- static final boolean isOutputFile = true;
-
- final RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
-
- private AbstractSingleActivityOperatorDescriptor getPrinter(
- JobSpecification spec, boolean isFile, String prefix)
- throws IOException {
- AbstractSingleActivityOperatorDescriptor printer;
-
- if (!isOutputFile)
- printer = new PrinterOperatorDescriptor(spec);
- else
- printer = new PlainFileWriterOperatorDescriptor(spec,
- new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC1_ID, createTempFile()
- .getAbsolutePath()),
- new FileSplit(NC2_ID, createTempFile()
- .getAbsolutePath()) }), "\t");
-
- return printer;
- }
-
- @Test
- public void hashSingleKeyScalarGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new CountAggregatorDescriptorFactory(),
- new IntSumAggregatorDescriptorFactory(keyFields.length),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashSingleKeyScalarGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void hashMultipleKeyScalarGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, });
-
- int[] keyFields = new int[] { 0, 9 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new IntSumAggregatorDescriptorFactory(1),
- new IntSumAggregatorDescriptorFactory(keyFields.length),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashMultipleKeyScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, });
-
- int[] keyFields = new int[] { 0, 9 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(1, 2),
- new IntSumAggregatorDescriptorFactory(2, 3) }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(2, 2),
- new IntSumAggregatorDescriptorFactory(3, 3) }),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashMultipleKeyMultipleScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void hashMultipleKeyNonScalarGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new ConcatAggregatorDescriptorFactory(9),
- new ConcatAggregatorDescriptorFactory(keyFields.length),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashMultipleKeyNonScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0, 9 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(1, 2),
- new IntSumAggregatorDescriptorFactory(2, 3),
- new ConcatAggregatorDescriptorFactory(9, 4) }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(2, 2),
- new IntSumAggregatorDescriptorFactory(3, 3),
- new ConcatAggregatorDescriptorFactory(4, 4) }),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashMultipleKeyMultipleFieldsGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void hashSingleKeyScalarAvgGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int frameLimits = 3;
- int tableSize = 8;
-
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new AvgAggregatorDescriptorFactory(1),
- new AvgAggregatorDescriptorFactory(keyFields.length),
- outputRec,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- isOutputFile, "hashSingleKeyScalarGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-}
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 61b4ad8..892ae39 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -42,18 +42,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -67,8 +55,12 @@
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
/**
@@ -237,97 +229,8 @@
AbstractOperatorDescriptor grouper;
switch (alg) {
- case 0: // External hash group
- grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keys,
- framesLimit,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new IntegerNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
- keys.length) }),
- outDesc,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
- break;
- case 1: // External sort + pre-cluster
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, framesLimit, keys, new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
- createPartitionConstraint(spec, sorter, inSplits);
-
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
-
- grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keys,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
- spec);
- spec.connect(sortGroupConn, sorter, 0, grouper, 0);
- break;
- case 2: // In-memory hash group
- grouper = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc, htSize);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanConn, fileScanner, 0, grouper, 0);
- break;
- case 3: // new external hash graph
- grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor(
+ case 0: // new external hash graph
+ grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
spec,
keys,
framesLimit,
@@ -358,7 +261,7 @@
spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
break;
- case 4: // External-sort + new-precluster
+ case 1: // External-sort + new-precluster
ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(
spec, framesLimit, keys, new IBinaryComparatorFactory[] {
// IntegerBinaryComparatorFactory.INSTANCE,
@@ -373,7 +276,7 @@
IntegerBinaryHashFunctionFactory.INSTANCE }));
spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
- grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor(
+ grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(
spec,
keys,
new IBinaryComparatorFactory[] {
@@ -390,8 +293,8 @@
spec);
spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
break;
- case 5: // Inmem
- grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor(
+ case 2: // Inmem
+ grouper = new HashGroupOperatorDescriptor(
spec,
keys,
new FieldHashPartitionComputerFactory(keys,
@@ -416,7 +319,7 @@
spec.connect(scanConn2, fileScanner, 0, grouper, 0);
break;
default:
- grouper = new ExternalGroupOperatorDescriptor(
+ grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
spec,
keys,
framesLimit,
@@ -424,19 +327,18 @@
// IntegerBinaryComparatorFactory.INSTANCE,
IntegerBinaryComparatorFactory.INSTANCE },
new IntegerNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
- keys.length) }),
- outDesc,
- new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(keys,
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
+ false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
+ keys.length, false) }), outDesc,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
new IBinaryHashFunctionFactory[] {
// IntegerBinaryHashFunctionFactory.INSTANCE,
IntegerBinaryHashFunctionFactory.INSTANCE }),
htSize), false);
-
+
createPartitionConstraint(spec, grouper, outSplits);
// Connect scanner with the grouper
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 0fa6424..8cac822 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -39,9 +39,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -51,7 +48,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
@@ -143,8 +143,8 @@
gBy = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
groupResultDesc, htSize);
createPartitionConstraint(spec, gBy, outSplits);
IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -166,8 +166,8 @@
gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
groupResultDesc);
createPartitionConstraint(spec, gBy, outSplits);
OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index a43aafb..9e88f8c 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -45,9 +45,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -57,6 +54,9 @@
import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -316,8 +316,8 @@
new int[] { 6 },
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
groupResultDesc, 16);
createPartitionConstraint(spec, gby, resultSplits);