reformat the source code
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@981 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 000e459..1744b99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -88,8 +88,8 @@
protected abstract IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception;
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0096576..7cf437d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -86,11 +86,7 @@
}
private enum State {
- INIT,
- IN_RECORD,
- EOR,
- CR,
- EOF
+ INIT, IN_RECORD, EOR, CR, EOF
}
private class FieldCursor {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index d85f437..76cb0ae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -44,8 +44,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final FileSplit split = fileSplitProvider.getFileSplits()[partition];
final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
return new AbstractUnaryOutputSourceOperatorNodePushable() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index 7ce8594..8397d29 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -39,8 +39,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
final FileSplit[] splits = fileSplitProvider.getFileSplits();
return new AbstractUnaryInputSinkOperatorNodePushable() {
private OutputStream out;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 47a3158..6bf29d2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -56,8 +56,15 @@
this.delim = delim;
}
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.
+ * uci.ics.hyracks.api.context.IHyracksContext,
+ * edu.uci.ics.hyracks.api.job.IOperatorEnvironment,
+ * edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int,
+ * int)
*/
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index 38ac1a3..af345aa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -69,11 +69,14 @@
@Override
protected void configure() throws Exception {
- // currently a no-op, but is meant to initialize , if required before it is asked
+ // currently a no-op, but is meant to initialize , if required before it
+ // is asked
// to create a record reader
- // this is executed at the node and is useful for operators that could not be
- // initialized from the client completely, because of lack of information specific
- // to the node where the operator gets executed.
+ // this is executed at the node and is useful for operators that could
+ // not be
+ // initialized from the client completely, because of lack of
+ // information specific
+ // to the node where the operator gets executed.
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 79a62d1..e72f85c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -20,25 +20,25 @@
*
*/
public class AggregateState implements Serializable {
-
+
private static final long serialVersionUID = 1L;
-
+
public Object state = null;
-
- public AggregateState(){
+
+ public AggregateState() {
state = null;
}
-
- public AggregateState(Object obj){
+
+ public AggregateState(Object obj) {
state = obj;
}
public void reset() {
state = null;
}
-
+
public void close() {
state = null;
}
-
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 07da19b..370f6d0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -120,6 +120,6 @@
@Override
public void fail() throws HyracksDataException {
// TODO Auto-generated method stub
-
+
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 2edbf97..0cc47b8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -74,14 +74,10 @@
private final ISpillableTableFactory spillableTableFactory;
private final boolean isOutputSorted;
- public ExternalGroupOperatorDescriptor(JobSpecification spec,
- int[] keyFields, int framesLimit,
- IBinaryComparatorFactory[] comparatorFactories,
- INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory,
- IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor recordDescriptor,
- ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+ public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
super(spec, 1, 1);
this.framesLimit = framesLimit;
if (framesLimit <= 1) {
@@ -89,9 +85,7 @@
* Minimum of 2 frames: 1 for input records, and 1 for output
* aggregation results.
*/
- throw new IllegalStateException(
- "frame limit should at least be 2, but it is "
- + framesLimit + "!");
+ throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
}
this.aggregatorFactory = aggregatorFactory;
this.mergerFactory = mergerFactory;
@@ -117,10 +111,8 @@
*/
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
- getOperatorId(), AGGREGATE_ACTIVITY_ID));
- MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
- MERGE_ACTIVITY_ID));
+ AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
builder.addActivity(aggregateAct);
builder.addSourceEdge(0, aggregateAct, 0);
@@ -162,35 +154,28 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private AggregateActivityState state;
@Override
public void open() throws HyracksDataException {
- state = new AggregateActivityState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
+ state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
state.runs = new LinkedList<RunFileReader>();
- state.gTable = spillableTableFactory.buildSpillableTable(
- ctx, keyFields, comparatorFactories,
+ state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
ExternalGroupOperatorDescriptor.this.framesLimit);
state.gTable.reset();
}
@Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
@@ -230,15 +215,12 @@
private void flushFramesToRun() throws HyracksDataException {
FileReference runFile;
try {
- runFile = ctx.getJobletContext()
- .createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class
- .getSimpleName());
+ runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
} catch (IOException e) {
throw new HyracksDataException(e);
}
- RunFileWriter writer = new RunFileWriter(runFile,
- ctx.getIOManager());
+ RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
writer.open();
try {
state.gTable.sortFrames();
@@ -264,15 +246,12 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
}
int[] keyFieldsInPartialResults = new int[keyFields.length];
@@ -280,12 +259,9 @@
keyFieldsInPartialResults[i] = i;
}
- final IAggregatorDescriptor aggregator = mergerFactory
- .createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0], keyFields,
- keyFieldsInPartialResults);
- final AggregateState aggregateState = aggregator
- .createAggregateStates();
+ final IAggregatorDescriptor aggregator = mergerFactory.createAggregator(ctx, recordDescriptors[0],
+ recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+ final AggregateState aggregateState = aggregator.createAggregateStates();
final int[] storedKeys = new int[keyFields.length];
/**
@@ -295,8 +271,7 @@
storedKeys[i] = i;
}
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
- recordDescriptors[0].getFields().length);
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
/**
@@ -308,8 +283,7 @@
* Output frame.
*/
private ByteBuffer outFrame, writerFrame;
- private final FrameTupleAppender outAppender = new FrameTupleAppender(
- ctx.getFrameSize());
+ private final FrameTupleAppender outAppender = new FrameTupleAppender(ctx.getFrameSize());
private FrameTupleAppender writerAppender;
private LinkedList<RunFileReader> runs;
@@ -326,14 +300,12 @@
private int[] currentFrameIndexInRun;
private int[] currentRunFrames;
- private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
- ctx.getFrameSize(), recordDescriptors[0]);
+ private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptors[0]);
public void initialize() throws HyracksDataException {
- aggState = (AggregateActivityState) ctx
- .getTaskState(new TaskId(new ActivityId(
- getOperatorId(), AGGREGATE_ACTIVITY_ID),
- partition));
+ aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ AGGREGATE_ACTIVITY_ID), partition));
runs = aggState.runs;
writer.open();
try {
@@ -368,8 +340,7 @@
}
}
- private void doPass(LinkedList<RunFileReader> runs)
- throws HyracksDataException {
+ private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
FileReference newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
@@ -384,10 +355,8 @@
runNumber = runs.size();
} else {
runNumber = framesLimit - 2;
- newRun = ctx.getJobletContext()
- .createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class
- .getSimpleName());
+ newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
writer = new RunFileWriter(newRun, ctx.getIOManager());
writer.open();
}
@@ -399,12 +368,10 @@
* the ones fit into the inFrames
*/
RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
- .size()];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
- ctx.getFrameSize(), recordDescriptors[0],
- runNumber, comparator);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+ recordDescriptors[0], runNumber, comparator);
/**
* current tuple index in each run
*/
@@ -417,26 +384,19 @@
runFileReaders[runIndex].open();
currentRunFrames[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runIndex
- * runFrameLimit;
+ currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex]
- + j;
- if (runFileReaders[runIndex].nextFrame(inFrames
- .get(frameIndex))) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(
- ctx.getFrameSize(),
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescriptors[0]);
- tupleAccessors[frameIndex].reset(inFrames
- .get(frameIndex));
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
currentRunFrames[runIndex]++;
if (j == 0)
- setNextTopTuple(runIndex, tupleIndices,
- runFileReaders, tupleAccessors,
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
topTuples);
} else {
- closeRun(runIndex, runFileReaders,
- tupleAccessors);
+ closeRun(runIndex, runFileReaders, tupleAccessors);
break;
}
}
@@ -454,12 +414,9 @@
int runIndex = topTuples.peek().getRunid();
FrameTupleAccessor fta = top.getAccessor();
- int currentTupleInOutFrame = outFrameAccessor
- .getTupleCount() - 1;
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex,
- outFrameAccessor,
- currentTupleInOutFrame) != 0) {
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
/**
* Initialize the first output record Reset the
* tuple builder
@@ -467,18 +424,13 @@
tupleBuilder.reset();
- aggregator.init(tupleBuilder, fta, tupleIndex,
- aggregateState);
+ aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
- if (!outAppender.appendSkipEmptyField(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
flushOutFrame(writer, finalPass);
- if (!outAppender.appendSkipEmptyField(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
throw new HyracksDataException(
"The partial result is too large to be initialized in a frame.");
}
@@ -491,14 +443,12 @@
* outFrame
*/
- aggregator.aggregate(fta, tupleIndex,
- outFrameAccessor,
- currentTupleInOutFrame, aggregateState);
+ aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame,
+ aggregateState);
}
tupleIndices[runIndex]++;
- setNextTopTuple(runIndex, tupleIndices,
- runFileReaders, tupleAccessors, topTuples);
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
}
if (outAppender.getTupleCount() > 0) {
@@ -523,12 +473,10 @@
}
}
- private void flushOutFrame(IFrameWriter writer, boolean isFinal)
- throws HyracksDataException {
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
if (finalTupleBuilder == null) {
- finalTupleBuilder = new ArrayTupleBuilder(
- recordDescriptors[0].getFields().length);
+ finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
}
if (writerFrame == null) {
@@ -536,8 +484,7 @@
}
if (writerAppender == null) {
- writerAppender = new FrameTupleAppender(
- ctx.getFrameSize());
+ writerAppender = new FrameTupleAppender(ctx.getFrameSize());
writerAppender.reset(writerFrame, true);
}
@@ -549,21 +496,19 @@
if (isFinal) {
- aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i,
- aggregateState);
-
+ aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
} else {
- aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i,
- aggregateState);
+ aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
}
-
-
- if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
FrameUtils.flushFrame(writerFrame, writer);
writerAppender.reset(writerFrame, true);
- if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
throw new HyracksDataException(
"Aggregation output is too large to be fit into a frame.");
}
@@ -573,19 +518,16 @@
FrameUtils.flushFrame(writerFrame, writer);
writerAppender.reset(writerFrame, true);
}
-
+
outAppender.reset(outFrame, true);
}
- private void setNextTopTuple(int runIndex, int[] tupleIndices,
- RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors,
- ReferencedPriorityQueue topTuples)
+ private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
throws HyracksDataException {
int runStart = runIndex * runFrameLimit;
boolean existNext = false;
- if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
- || runCursors[runIndex] == null) {
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
/**
* run already closed
*/
@@ -595,8 +537,7 @@
* not the last frame for this run
*/
existNext = true;
- if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
- .getTupleCount()) {
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
tupleIndices[runIndex] = 0;
currentFrameIndexInRun[runIndex]++;
}
@@ -619,15 +560,13 @@
*/
currentRunFrames[runIndex] = 0;
for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
- ByteBuffer buffer = tupleAccessors[frameOffset]
- .getBuffer();
+ ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
if (runCursors[runIndex].nextFrame(buffer)) {
tupleAccessors[frameOffset].reset(buffer);
if (tupleAccessors[frameOffset].getTupleCount() > 0) {
existNext = true;
} else {
- throw new IllegalStateException(
- "illegal: empty run file");
+ throw new IllegalStateException("illegal: empty run file");
}
currentRunFrames[runIndex]++;
} else {
@@ -637,10 +576,8 @@
}
if (existNext) {
- topTuples
- .popAndReplace(
- tupleAccessors[currentFrameIndexInRun[runIndex]],
- tupleIndices[runIndex]);
+ topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
+ tupleIndices[runIndex]);
} else {
topTuples.pop();
closeRun(runIndex, runCursors, tupleAccessors);
@@ -656,8 +593,7 @@
* @param tupleAccessor
* @throws HyracksDataException
*/
- private void closeRun(int index, RunFileReader[] runCursors,
- IFrameTupleAccessor[] tupleAccessor)
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
throws HyracksDataException {
if (runCursors[index] != null) {
runCursors[index].close();
@@ -666,18 +602,15 @@
}
}
- private int compareFrameTuples(IFrameTupleAccessor fta1,
- int j1, IFrameTupleAccessor fta2, int j2) {
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1)
- + fta1.getFieldSlotsLength()
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2)
- + fta2.getFieldSlotsLength()
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ fta2.getFieldStartOffset(j2, fIdx);
int l2_start = fta2.getFieldStartOffset(j2, fIdx);
int l2_end = fta2.getFieldEndOffset(j2, fIdx);
@@ -693,32 +626,25 @@
return op;
}
- private Comparator<ReferenceEntry> createEntryComparator(
- final IBinaryComparator[] comparators) {
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
@Override
public int compare(ReferenceEntry o1, ReferenceEntry o2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
- .getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
- .getAccessor();
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
int j1 = o1.getTupleIndex();
int j2 = o2.getTupleIndex();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1)
- + fta1.getFieldSlotsLength()
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx)
- - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2)
- + fta2.getFieldSlotsLength()
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx)
- - fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
index ee29c56..2346160 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
@@ -25,20 +25,17 @@
*/
public class FrameToolsForGroupers {
- public static void writeFields(byte[] buf, int offset, int length,
- ArrayTupleBuilder tupleBuilder) throws HyracksDataException {
- writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+ public static void writeFields(byte[] buf, int offset, int length, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize());
}
- public static void writeFields(byte[] buf, int offset, int length,
- int[] fieldsOffset, byte[] data, int dataOffset, int dataLength)
- throws HyracksDataException {
+ public static void writeFields(byte[] buf, int offset, int length, int[] fieldsOffset, byte[] data, int dataOffset,
+ int dataLength) throws HyracksDataException {
if (dataLength + 4 * fieldsOffset.length > length) {
- throw new HyracksDataException(
- "Out of buffer bound: try to write too much data ("
- + dataLength + ") to the given bound (" + length
- + ").");
+ throw new HyracksDataException("Out of buffer bound: try to write too much data (" + dataLength
+ + ") to the given bound (" + length + ").");
}
ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
@@ -48,60 +45,49 @@
buffer.put(data, dataOffset, dataLength);
}
- public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
- int addedTupleLength) throws HyracksDataException {
- int currentTupleCount = buffer.getInt(FrameHelper
- .getTupleCountOffset(buffer.capacity()));
- int currentTupleEndOffset = buffer
- .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * currentTupleCount);
+ public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength) throws HyracksDataException {
+ int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+ int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * currentTupleCount);
int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
// update tuple end offset
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * (currentTupleCount + 1), newTupleEndOffset);
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
+ newTupleEndOffset);
// Update the tuple count
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
- currentTupleCount + 1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
}
- public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
- int addedTupleLength, boolean isReset) throws HyracksDataException {
+ public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength, boolean isReset)
+ throws HyracksDataException {
int currentTupleCount;
int currentTupleEndOffset;
if (isReset) {
currentTupleCount = 0;
currentTupleEndOffset = 0;
} else {
- currentTupleCount = buffer.getInt(FrameHelper
- .getTupleCountOffset(buffer.capacity()));
- currentTupleEndOffset = buffer.getInt(FrameHelper
- .getTupleCountOffset(buffer.capacity())
- - 4
+ currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+ currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
* currentTupleCount);
}
int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
// update tuple end offset
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * (currentTupleCount + 1), newTupleEndOffset);
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
+ newTupleEndOffset);
// Update the tuple count
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
- currentTupleCount + 1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
}
public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
throws HyracksDataException {
-
- int currentTupleCount = buffer.getInt(FrameHelper
- .getTupleCountOffset(buffer.capacity()));
- if(currentTupleCount == 0 || isReset){
+
+ int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+ if (currentTupleCount == 0 || isReset) {
return length + 4 + 4 > buffer.capacity();
}
- int currentTupleEndOffset = buffer
- .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * currentTupleCount);
- return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer
- .capacity();
+ int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * currentTupleCount);
+ return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer.capacity();
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index ef427d1..43bb0ae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -34,242 +34,218 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
class GroupingHashTable {
- /**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
- * @author vinayakb
- */
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 9;
+ /**
+ * The pointers in the link store 3 int values for each entry in the
+ * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ *
+ * @author vinayakb
+ */
+ private static class Link {
+ private static final int INIT_POINTERS_SIZE = 9;
- int[] pointers;
- int size;
+ int[] pointers;
+ int size;
- Link() {
- pointers = new int[INIT_POINTERS_SIZE];
- size = 0;
- }
+ Link() {
+ pointers = new int[INIT_POINTERS_SIZE];
+ size = 0;
+ }
- void add(int bufferIdx, int tIndex, int accumulatorIdx) {
- while (size + 3 > pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
- }
- pointers[size++] = bufferIdx;
- pointers[size++] = tIndex;
- pointers[size++] = accumulatorIdx;
- }
- }
+ void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+ while (size + 3 > pointers.length) {
+ pointers = Arrays.copyOf(pointers, pointers.length * 2);
+ }
+ pointers[size++] = bufferIdx;
+ pointers[size++] = tIndex;
+ pointers[size++] = accumulatorIdx;
+ }
+ }
- private static final int INIT_AGG_STATE_SIZE = 8;
- private final IHyracksTaskContext ctx;
+ private static final int INIT_AGG_STATE_SIZE = 8;
+ private final IHyracksTaskContext ctx;
- private final List<ByteBuffer> buffers;
- private final Link[] table;
- /**
- * Aggregate states: a list of states for all groups maintained in the main
- * memory.
- */
- private AggregateState[] aggregateStates;
- private int accumulatorSize;
+ private final List<ByteBuffer> buffers;
+ private final Link[] table;
+ /**
+ * Aggregate states: a list of states for all groups maintained in the main
+ * memory.
+ */
+ private AggregateState[] aggregateStates;
+ private int accumulatorSize;
- private int lastBIndex;
- private final int[] storedKeys;
- private final int[] keys;
- private final IBinaryComparator[] comparators;
- private final FrameTuplePairComparator ftpc;
- private final ITuplePartitionComputer tpc;
- private final IAggregatorDescriptor aggregator;
+ private int lastBIndex;
+ private final int[] storedKeys;
+ private final int[] keys;
+ private final IBinaryComparator[] comparators;
+ private final FrameTuplePairComparator ftpc;
+ private final ITuplePartitionComputer tpc;
+ private final IAggregatorDescriptor aggregator;
- private final FrameTupleAppender appender;
+ private final FrameTupleAppender appender;
- private final FrameTupleAccessor storedKeysAccessor;
+ private final FrameTupleAccessor storedKeysAccessor;
- private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
+ private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
- GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
- IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf,
- IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int tableSize)
- throws HyracksDataException {
- this.ctx = ctx;
+ GroupingHashTable(IHyracksTaskContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
+ ITuplePartitionComputerFactory tpcf, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize)
+ throws HyracksDataException {
+ this.ctx = ctx;
- buffers = new ArrayList<ByteBuffer>();
- table = new Link[tableSize];
+ buffers = new ArrayList<ByteBuffer>();
+ table = new Link[tableSize];
- keys = fields;
- storedKeys = new int[fields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
- for (int i = 0; i < fields.length; ++i) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
- }
+ keys = fields;
+ storedKeys = new int[fields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+ for (int i = 0; i < fields.length; ++i) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+ }
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
- tpc = tpcf.createPartitioner();
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+ tpc = tpcf.createPartitioner();
- int[] keyFieldsInPartialResults = new int[fields.length];
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- keyFieldsInPartialResults[i] = i;
- }
+ int[] keyFieldsInPartialResults = new int[fields.length];
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ keyFieldsInPartialResults[i] = i;
+ }
- this.aggregator = aggregatorFactory.createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor, fields,
- keyFieldsInPartialResults);
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
+ keyFieldsInPartialResults);
- this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
- accumulatorSize = 0;
+ this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+ accumulatorSize = 0;
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
- storedKeySerDeser);
- storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- storedKeysRecordDescriptor);
- lastBIndex = -1;
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
+ storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+ lastBIndex = -1;
- appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
- addNewBuffer();
+ addNewBuffer();
- if (fields.length < outRecordDescriptor.getFields().length) {
- stateTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- } else {
- stateTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length + 1);
- }
- outputTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- }
+ if (fields.length < outRecordDescriptor.getFields().length) {
+ stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ } else {
+ stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
+ }
+ outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ }
- private void addNewBuffer() {
- ByteBuffer buffer = ctx.allocateFrame();
- buffer.position(0);
- buffer.limit(buffer.capacity());
- buffers.add(buffer);
- appender.reset(buffer, true);
- ++lastBIndex;
- }
+ private void addNewBuffer() {
+ ByteBuffer buffer = ctx.allocateFrame();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ buffers.add(buffer);
+ appender.reset(buffer, true);
+ ++lastBIndex;
+ }
- void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
- int entry = tpc.partition(accessor, tIndex, table.length);
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- int saIndex = -1;
- for (int i = 0; i < link.size; i += 3) {
- int sbIndex = link.pointers[i];
- int stIndex = link.pointers[i + 1];
- storedKeysAccessor.reset(buffers.get(sbIndex));
- int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
- if (c == 0) {
- saIndex = link.pointers[i + 2];
- break;
- }
- }
- if (saIndex < 0) {
- // Did not find the key. Insert a new entry.
- saIndex = accumulatorSize++;
- // Add keys
+ void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
+ int entry = tpc.partition(accessor, tIndex, table.length);
+ Link link = table[entry];
+ if (link == null) {
+ link = table[entry] = new Link();
+ }
+ int saIndex = -1;
+ for (int i = 0; i < link.size; i += 3) {
+ int sbIndex = link.pointers[i];
+ int stIndex = link.pointers[i + 1];
+ storedKeysAccessor.reset(buffers.get(sbIndex));
+ int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+ if (c == 0) {
+ saIndex = link.pointers[i + 2];
+ break;
+ }
+ }
+ if (saIndex < 0) {
+ // Did not find the key. Insert a new entry.
+ saIndex = accumulatorSize++;
+ // Add keys
- // Add aggregation fields
- AggregateState newState = aggregator.createAggregateStates();
+ // Add aggregation fields
+ AggregateState newState = aggregator.createAggregateStates();
- stateTupleBuilder.reset();
- for (int k = 0; k < keys.length; k++) {
- stateTupleBuilder.addField(accessor, tIndex, keys[k]);
- }
+ stateTupleBuilder.reset();
+ for (int k = 0; k < keys.length; k++) {
+ stateTupleBuilder.addField(accessor, tIndex, keys[k]);
+ }
- aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
+ aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
- if (!appender.appendSkipEmptyField(
- stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
- addNewBuffer();
- if (!appender.appendSkipEmptyField(
- stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Cannot init the aggregate state in a single frame.");
- }
- }
+ if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+ addNewBuffer();
+ if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot init the aggregate state in a single frame.");
+ }
+ }
- if (accumulatorSize >= aggregateStates.length) {
- aggregateStates = Arrays.copyOf(aggregateStates,
- aggregateStates.length * 2);
- }
+ if (accumulatorSize >= aggregateStates.length) {
+ aggregateStates = Arrays.copyOf(aggregateStates, aggregateStates.length * 2);
+ }
- aggregateStates[saIndex] = newState;
+ aggregateStates[saIndex] = newState;
- link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
+ link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
- } else {
- aggregator.aggregate(accessor, tIndex, null, 0,
- aggregateStates[saIndex]);
- }
- }
+ } else {
+ aggregator.aggregate(accessor, tIndex, null, 0, aggregateStates[saIndex]);
+ }
+ }
- void write(IFrameWriter writer) throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- appender.reset(buffer, true);
+ void write(IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ appender.reset(buffer, true);
- for (int i = 0; i < table.length; ++i) {
- Link link = table[i];
- if (link != null) {
- for (int j = 0; j < link.size; j += 3) {
- int bIndex = link.pointers[j];
- int tIndex = link.pointers[j + 1];
- int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = buffers.get(bIndex);
- storedKeysAccessor.reset(keyBuffer);
+ for (int i = 0; i < table.length; ++i) {
+ Link link = table[i];
+ if (link != null) {
+ for (int j = 0; j < link.size; j += 3) {
+ int bIndex = link.pointers[j];
+ int tIndex = link.pointers[j + 1];
+ int aIndex = link.pointers[j + 2];
+ ByteBuffer keyBuffer = buffers.get(bIndex);
+ storedKeysAccessor.reset(keyBuffer);
- // copy keys
- outputTupleBuilder.reset();
- for (int k = 0; k < storedKeys.length; k++) {
- outputTupleBuilder.addField(storedKeysAccessor, tIndex,
- storedKeys[k]);
- }
+ // copy keys
+ outputTupleBuilder.reset();
+ for (int k = 0; k < storedKeys.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor, tIndex, storedKeys[k]);
+ }
- aggregator
- .outputFinalResult(outputTupleBuilder,
- storedKeysAccessor, tIndex,
- aggregateStates[aIndex]);
+ aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor, tIndex,
+ aggregateStates[aIndex]);
- if (!appender.appendSkipEmptyField(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- writer.nextFrame(buffer);
- appender.reset(buffer, true);
- if (!appender.appendSkipEmptyField(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Cannot write the aggregation output into a frame.");
- }
- }
+ if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ writer.nextFrame(buffer);
+ appender.reset(buffer, true);
+ if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot write the aggregation output into a frame.");
+ }
+ }
- }
- }
- }
- if (appender.getTupleCount() != 0) {
- writer.nextFrame(buffer);
- }
- }
+ }
+ }
+ }
+ if (appender.getTupleCount() != 0) {
+ writer.nextFrame(buffer);
+ }
+ }
- void close() throws HyracksDataException {
- for (AggregateState aState : aggregateStates) {
- aState.close();
- }
- }
+ void close() throws HyracksDataException {
+ for (AggregateState aState : aggregateStates) {
+ aState.close();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index c089dd4..49443d1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -57,10 +57,8 @@
private final int tableSize;
- public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
- ITuplePartitionComputerFactory tpcf,
- IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory,
+ public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys, ITuplePartitionComputerFactory tpcf,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor outRecordDescriptor, int tableSize) {
super(spec, 1, 1);
this.keys = keys;
@@ -80,12 +78,10 @@
*/
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
- HASH_BUILD_ACTIVITY_ID));
+ HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
builder.addActivity(ha);
- OutputActivity oa = new OutputActivity(new ActivityId(odId,
- OUTPUT_ACTIVITY_ID));
+ OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
builder.addActivity(oa);
builder.addSourceEdge(0, ha, 0);
@@ -122,31 +118,24 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
return new AbstractUnaryInputSinkOperatorNodePushable() {
private HashBuildActivityState state;
@Override
public void open() throws HyracksDataException {
- state = new HashBuildActivityState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
- state.table = new GroupingHashTable(ctx, keys,
- comparatorFactories, tpcf, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
+ state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
tableSize);
}
@Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
@@ -166,8 +155,7 @@
@Override
public void fail() throws HyracksDataException {
- throw new HyracksDataException(
- "HashGroupOperator is failed.");
+ throw new HyracksDataException("HashGroupOperator is failed.");
}
};
}
@@ -181,17 +169,13 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- HashBuildActivityState buildState = (HashBuildActivityState) ctx
- .getTaskState(new TaskId(new ActivityId(
- getOperatorId(), HASH_BUILD_ACTIVITY_ID),
- partition));
+ HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
+ new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
GroupingHashTable table = buildState.table;
writer.open();
try {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 29bd083..831e497 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -44,312 +44,265 @@
*/
public class HashSpillableTableFactory implements ISpillableTableFactory {
- private static final long serialVersionUID = 1L;
- private final ITuplePartitionComputerFactory tpcf;
- private final int tableSize;
+ private static final long serialVersionUID = 1L;
+ private final ITuplePartitionComputerFactory tpcf;
+ private final int tableSize;
- public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf,
- int tableSize) {
- this.tpcf = tpcf;
- this.tableSize = tableSize;
- }
+ public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
+ this.tpcf = tpcf;
+ this.tableSize = tableSize;
+ }
- /*
- * (non-Javadoc)
- *
- * @see
- * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
- * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
- * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
- * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
- * edu.
- * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
- * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
- * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
- */
- @Override
- public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx,
- final int[] keyFields,
- IBinaryComparatorFactory[] comparatorFactories,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IAggregatorDescriptorFactory aggregateFactory,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int framesLimit)
- throws HyracksDataException {
- final int[] storedKeys = new int[keyFields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
- for (int i = 0; i < keyFields.length; i++) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
+ * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
+ * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
+ * edu.
+ * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
+ * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
+ */
+ @Override
+ public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
+ final int[] storedKeys = new int[keyFields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+ for (int i = 0; i < keyFields.length; i++) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+ }
- RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
- final FrameTupleAccessor storedKeysAccessor1;
- final FrameTupleAccessor storedKeysAccessor2;
- if (keyFields.length >= outRecordDescriptor.getFields().length) {
- // for the case of zero-aggregations
- ISerializerDeserializer<?>[] fields = outRecordDescriptor
- .getFields();
- ITypeTraits[] types = outRecordDescriptor.getTypeTraits();
- ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
- for (int i = 0; i < fields.length; i++)
- newFields[i] = fields[i];
- ITypeTraits[] newTypes = null;
- if (types != null) {
- newTypes = new ITypeTraits[types.length + 1];
- for (int i = 0; i < types.length; i++)
- newTypes[i] = types[i];
- }
- internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
- }
- storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
- internalRecordDescriptor);
- storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
- internalRecordDescriptor);
+ RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+ final FrameTupleAccessor storedKeysAccessor1;
+ final FrameTupleAccessor storedKeysAccessor2;
+ if (keyFields.length >= outRecordDescriptor.getFields().length) {
+ // for the case of zero-aggregations
+ ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
+ ITypeTraits[] types = outRecordDescriptor.getTypeTraits();
+ ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+ for (int i = 0; i < fields.length; i++)
+ newFields[i] = fields[i];
+ ITypeTraits[] newTypes = null;
+ if (types != null) {
+ newTypes = new ITypeTraits[types.length + 1];
+ for (int i = 0; i < types.length; i++)
+ newTypes[i] = types[i];
+ }
+ internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+ }
+ storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+ storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
- final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
- keyFields, storedKeys, comparators);
+ final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
- final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
- storedKeys, storedKeys, comparators);
+ final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
- final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+ final ITuplePartitionComputer tpc = tpcf.createPartitioner();
- final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+ .createNormalizedKeyComputer();
- int[] keyFieldsInPartialResults = new int[keyFields.length];
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- keyFieldsInPartialResults[i] = i;
- }
+ int[] keyFieldsInPartialResults = new int[keyFields.length];
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ keyFieldsInPartialResults[i] = i;
+ }
- final IAggregatorDescriptor aggregator = aggregateFactory
- .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
- keyFields, keyFieldsInPartialResults);
+ final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
+ outRecordDescriptor, keyFields, keyFieldsInPartialResults);
- final AggregateState aggregateState = aggregator
- .createAggregateStates();
+ final AggregateState aggregateState = aggregator.createAggregateStates();
final ArrayTupleBuilder stateTupleBuilder;
if (keyFields.length < outRecordDescriptor.getFields().length) {
- stateTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
+ stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
} else {
- stateTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length + 1);
+ stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
}
- final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
+ final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
return new ISpillableTable() {
private int lastBufIndex;
- private ByteBuffer outputFrame;
+ private ByteBuffer outputFrame;
private FrameTupleAppender outputAppender;
- private FrameTupleAppender stateAppender = new FrameTupleAppender(
- ctx.getFrameSize());
+ private FrameTupleAppender stateAppender = new FrameTupleAppender(ctx.getFrameSize());
- private final ISerializableTable table = new SerializableHashTable(
- tableSize, ctx);
+ private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
private final TuplePointer storedTuplePointer = new TuplePointer();
private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
- /**
- * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
- * = Frame index in the "Frames" list, [1] = Tuple index in the
- * frame, [2] = Poor man's normalized key for the tuple.
- */
- private int[] tPointers;
+ /**
+ * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+ * = Frame index in the "Frames" list, [1] = Tuple index in the
+ * frame, [2] = Poor man's normalized key for the tuple.
+ */
+ private int[] tPointers;
- @Override
- public void sortFrames() {
- int sfIdx = storedKeys[0];
- int totalTCount = table.getTupleCount();
- tPointers = new int[totalTCount * 3];
- int ptr = 0;
+ @Override
+ public void sortFrames() {
+ int sfIdx = storedKeys[0];
+ int totalTCount = table.getTupleCount();
+ tPointers = new int[totalTCount * 3];
+ int ptr = 0;
- for (int i = 0; i < tableSize; i++) {
- int entry = i;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset, storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- tPointers[ptr * 3] = entry;
- tPointers[ptr * 3 + 1] = offset;
- table.getTuplePointer(entry, offset, storedTuplePointer);
- int fIndex = storedTuplePointer.frameIndex;
- int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(fIndex));
- int tStart = storedKeysAccessor1
- .getTupleStartOffset(tIndex);
- int f0StartRel = storedKeysAccessor1
- .getFieldStartOffset(tIndex, sfIdx);
- int f0EndRel = storedKeysAccessor1.getFieldEndOffset(
- tIndex, sfIdx);
- int f0Start = f0StartRel + tStart
- + storedKeysAccessor1.getFieldSlotsLength();
- tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc
- .normalize(storedKeysAccessor1.getBuffer()
- .array(), f0Start, f0EndRel
- - f0StartRel);
- ptr++;
- offset++;
- } while (true);
- }
- /**
- * Sort using quick sort
- */
- if (tPointers.length > 0) {
- sort(tPointers, 0, totalTCount);
- }
- }
+ for (int i = 0; i < tableSize; i++) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ tPointers[ptr * 3] = entry;
+ tPointers[ptr * 3 + 1] = offset;
+ table.getTuplePointer(entry, offset, storedTuplePointer);
+ int fIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(fIndex));
+ int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
+ int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
+ int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
+ int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
+ tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
+ .array(), f0Start, f0EndRel - f0StartRel);
+ ptr++;
+ offset++;
+ } while (true);
+ }
+ /**
+ * Sort using quick sort
+ */
+ if (tPointers.length > 0) {
+ sort(tPointers, 0, totalTCount);
+ }
+ }
- @Override
- public void reset() {
- lastBufIndex = -1;
- tPointers = null;
- table.reset();
- aggregator.reset();
- }
+ @Override
+ public void reset() {
+ lastBufIndex = -1;
+ tPointers = null;
+ table.reset();
+ aggregator.reset();
+ }
- @Override
- public boolean insert(FrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- if (lastBufIndex < 0)
- nextAvailableFrame();
- int entry = tpc.partition(accessor, tIndex, tableSize);
- boolean foundGroup = false;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset++, storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- storedKeysAccessor1.reset(frames
- .get(storedTuplePointer.frameIndex));
- int c = ftpcPartial.compare(accessor, tIndex,
- storedKeysAccessor1, storedTuplePointer.tupleIndex);
- if (c == 0) {
- foundGroup = true;
- break;
- }
- } while (true);
+ @Override
+ public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ if (lastBufIndex < 0)
+ nextAvailableFrame();
+ int entry = tpc.partition(accessor, tIndex, tableSize);
+ boolean foundGroup = false;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+ int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
+ if (c == 0) {
+ foundGroup = true;
+ break;
+ }
+ } while (true);
- if (!foundGroup) {
+ if (!foundGroup) {
stateTupleBuilder.reset();
- aggregator.init(stateTupleBuilder, accessor, tIndex,
- aggregateState);
- if (!stateAppender.appendSkipEmptyField(
- stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
+ aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
+ if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
if (!nextAvailableFrame()) {
return false;
}
- if (!stateAppender.appendSkipEmptyField(
- stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Cannot init external aggregate state in a frame.");
+ if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot init external aggregate state in a frame.");
}
}
storedTuplePointer.frameIndex = lastBufIndex;
- storedTuplePointer.tupleIndex = stateAppender
- .getTupleCount() - 1;
+ storedTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;
table.insert(entry, storedTuplePointer);
} else {
- aggregator.aggregate(accessor, tIndex, storedKeysAccessor1,
- storedTuplePointer.tupleIndex, aggregateState);
+ aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex,
+ aggregateState);
- }
- return true;
- }
+ }
+ return true;
+ }
- @Override
- public List<ByteBuffer> getFrames() {
- return frames;
- }
+ @Override
+ public List<ByteBuffer> getFrames() {
+ return frames;
+ }
- @Override
- public int getFrameCount() {
- return lastBufIndex;
- }
+ @Override
+ public int getFrameCount() {
+ return lastBufIndex;
+ }
- @Override
- public void flushFrames(IFrameWriter writer, boolean isPartial)
- throws HyracksDataException {
- if (outputFrame == null) {
- outputFrame = ctx.allocateFrame();
- }
+ @Override
+ public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
+ if (outputFrame == null) {
+ outputFrame = ctx.allocateFrame();
+ }
if (outputAppender == null) {
- outputAppender = new FrameTupleAppender(
- outputFrame.capacity());
+ outputAppender = new FrameTupleAppender(outputFrame.capacity());
}
outputAppender.reset(outputFrame, true);
writer.open();
- if (tPointers == null) {
- // Not sorted
- for (int i = 0; i < tableSize; ++i) {
- int entry = i;
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset++,
- storedTuplePointer);
- if (storedTuplePointer.frameIndex < 0)
- break;
- int bIndex = storedTuplePointer.frameIndex;
- int tIndex = storedTuplePointer.tupleIndex;
+ if (tPointers == null) {
+ // Not sorted
+ for (int i = 0; i < tableSize; ++i) {
+ int entry = i;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ int bIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(bIndex));
+ storedKeysAccessor1.reset(frames.get(bIndex));
outputTupleBuilder.reset();
- if (isPartial) {
+ if (isPartial) {
- aggregator.outputPartialResult(
- outputTupleBuilder,
- storedKeysAccessor1, tIndex,
+ aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
aggregateState);
} else {
- aggregator.outputFinalResult(
- outputTupleBuilder,
- storedKeysAccessor1, tIndex,
+ aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
aggregateState);
}
- if (!outputAppender.appendSkipEmptyField(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
+ if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
FrameUtils.flushFrame(outputFrame, writer);
outputAppender.reset(outputFrame, true);
- if (!outputAppender
- .appendSkipEmptyField(
- outputTupleBuilder
- .getFieldEndOffsets(),
- outputTupleBuilder
- .getByteArray(), 0,
- outputTupleBuilder.getSize())) {
+ if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
throw new HyracksDataException(
"The output item is too large to be fit into a frame.");
}
@@ -368,8 +321,7 @@
for (int ptr = 0; ptr < n; ptr++) {
int tableIndex = tPointers[ptr * 3];
int rowIndex = tPointers[ptr * 3 + 1];
- table.getTuplePointer(tableIndex, rowIndex,
- storedTuplePointer);
+ table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
int frameIndex = storedTuplePointer.frameIndex;
int tupleIndex = storedTuplePointer.tupleIndex;
// Get the frame containing the value
@@ -378,33 +330,24 @@
outputTupleBuilder.reset();
- if (isPartial) {
+ if (isPartial) {
- aggregator
- .outputPartialResult(outputTupleBuilder,
- storedKeysAccessor1, tupleIndex,
- aggregateState);
+ aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
+ aggregateState);
} else {
- aggregator
- .outputFinalResult(outputTupleBuilder,
- storedKeysAccessor1, tupleIndex,
- aggregateState);
+ aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
+ aggregateState);
}
- if (!outputAppender.appendSkipEmptyField(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
+ if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
FrameUtils.flushFrame(outputFrame, writer);
outputAppender.reset(outputFrame, true);
- if (!outputAppender.appendSkipEmptyField(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "The output item is too large to be fit into a frame.");
+ if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("The output item is too large to be fit into a frame.");
}
}
}
@@ -415,27 +358,27 @@
aggregator.close();
}
- @Override
- public void close() {
- lastBufIndex = -1;
- tPointers = null;
- table.close();
- frames.clear();
- aggregateState.close();
- }
+ @Override
+ public void close() {
+ lastBufIndex = -1;
+ tPointers = null;
+ table.close();
+ frames.clear();
+ aggregateState.close();
+ }
- /**
- * Set the working frame to the next available frame in the frame
- * list. There are two cases:<br>
- * 1) If the next frame is not initialized, allocate a new frame. 2)
- * When frames are already created, they are recycled.
- *
- * @return Whether a new frame is added successfully.
- */
- private boolean nextAvailableFrame() {
- // Return false if the number of frames is equal to the limit.
- if (lastBufIndex + 1 >= framesLimit)
- return false;
+ /**
+ * Set the working frame to the next available frame in the frame
+ * list. There are two cases:<br>
+ * 1) If the next frame is not initialized, allocate a new frame. 2)
+ * When frames are already created, they are recycled.
+ *
+ * @return Whether a new frame is added successfully.
+ */
+ private boolean nextAvailableFrame() {
+ // Return false if the number of frames is equal to the limit.
+ if (lastBufIndex + 1 >= framesLimit)
+ return false;
if (frames.size() < framesLimit) {
// Insert a new frame
@@ -456,107 +399,101 @@
return true;
}
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- int mTable = tPointers[m * 3];
- int mRow = tPointers[m * 3 + 1];
- int mNormKey = tPointers[m * 3 + 2];
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ int mTable = tPointers[m * 3];
+ int mRow = tPointers[m * 3 + 1];
+ int mNormKey = tPointers[m * 3 + 2];
- table.getTuplePointer(mTable, mRow, storedTuplePointer);
- int mFrame = storedTuplePointer.frameIndex;
- int mTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor1.reset(frames.get(mFrame));
+ table.getTuplePointer(mTable, mRow, storedTuplePointer);
+ int mFrame = storedTuplePointer.frameIndex;
+ int mTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor1.reset(frames.get(mFrame));
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int bTable = tPointers[b * 3];
- int bRow = tPointers[b * 3 + 1];
- int bNormKey = tPointers[b * 3 + 2];
- int cmp = 0;
- if (bNormKey != mNormKey) {
- cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
- : 1;
- } else {
- table.getTuplePointer(bTable, bRow,
- storedTuplePointer);
- int bFrame = storedTuplePointer.frameIndex;
- int bTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(bFrame));
- cmp = ftpcTuple.compare(storedKeysAccessor2,
- bTuple, storedKeysAccessor1, mTuple);
- }
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cTable = tPointers[c * 3];
- int cRow = tPointers[c * 3 + 1];
- int cNormKey = tPointers[c * 3 + 2];
- int cmp = 0;
- if (cNormKey != mNormKey) {
- cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
- : 1;
- } else {
- table.getTuplePointer(cTable, cRow,
- storedTuplePointer);
- int cFrame = storedTuplePointer.frameIndex;
- int cTuple = storedTuplePointer.tupleIndex;
- storedKeysAccessor2.reset(frames.get(cFrame));
- cmp = ftpcTuple.compare(storedKeysAccessor2,
- cTuple, storedKeysAccessor1, mTuple);
- }
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(tPointers, b++, c--);
- }
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int bTable = tPointers[b * 3];
+ int bRow = tPointers[b * 3 + 1];
+ int bNormKey = tPointers[b * 3 + 2];
+ int cmp = 0;
+ if (bNormKey != mNormKey) {
+ cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ table.getTuplePointer(bTable, bRow, storedTuplePointer);
+ int bFrame = storedTuplePointer.frameIndex;
+ int bTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(bFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cTable = tPointers[c * 3];
+ int cRow = tPointers[c * 3 + 1];
+ int cNormKey = tPointers[c * 3 + 2];
+ int cmp = 0;
+ if (cNormKey != mNormKey) {
+ cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ table.getTuplePointer(cTable, cRow, storedTuplePointer);
+ int cFrame = storedTuplePointer.frameIndex;
+ int cTuple = storedTuplePointer.tupleIndex;
+ storedKeysAccessor2.reset(frames.get(cFrame));
+ cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+ }
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
- }
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
- }
- }
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 3; ++i) {
- int t = x[a * 3 + i];
- x[a * 3 + i] = x[b * 3 + i];
- x[b * 3 + i] = t;
- }
- }
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 3; ++i) {
+ int t = x[a * 3 + i];
+ x[a * 3 + i] = x[b * 3 + i];
+ x[b * 3 + i] = t;
+ }
+ }
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
- };
- }
+ };
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 329eb4b..2cf978c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -42,8 +42,8 @@
* The state to be initialized.
* @throws HyracksDataException
*/
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) throws HyracksDataException;
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
@@ -69,9 +69,8 @@
* The aggregate state.
* @throws HyracksDataException
*/
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException;
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException;
/**
* Output the partial aggregation result.
@@ -85,9 +84,8 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException;
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException;
/**
* Output the final aggregation result.
@@ -101,9 +99,8 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException;
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException;
public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 339c29f..9ff915d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -25,8 +25,8 @@
*/
public interface IAggregatorDescriptorFactory extends Serializable {
- IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) throws HyracksDataException;
-
+ IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+ throws HyracksDataException;
+
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 32f4365..a34fa73 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -41,8 +41,7 @@
* The state to be initialized.
* @throws HyracksDataException
*/
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException;
/**
@@ -74,8 +73,7 @@
* The aggregate state.
* @throws HyracksDataException
*/
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, AggregateState state)
throws HyracksDataException;
/**
@@ -90,8 +88,8 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException;
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException;
/**
* Output the final aggregation result.
@@ -105,8 +103,8 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException;
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException;
public boolean needsBinaryState();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
index ee35c5e..6f50597 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
@@ -25,8 +25,7 @@
*/
public interface IFieldAggregateDescriptorFactory extends Serializable {
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 17ad5f0..c718ee4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -29,17 +29,15 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-public class PreclusteredGroupOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private final int[] groupFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final IAggregatorDescriptorFactory aggregatorFactory;
private static final long serialVersionUID = 1L;
- public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
- int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory,
+ public PreclusteredGroupOperatorDescriptor(JobSpecification spec, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.groupFields = groupFields;
@@ -49,40 +47,33 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) throws HyracksDataException {
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final RecordDescriptor inRecordDesc = recordDescProvider
- .getInputRecordDescriptor(getOperatorId(), 0);
- final IAggregatorDescriptor aggregator = aggregatorFactory
- .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
- groupFields, groupFields);
+ final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+ recordDescriptors[0], groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
- final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
- ctx.getFrameSize(), inRecordDesc);
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor.reset(copyFrame);
ByteBuffer outFrame = ctx.allocateFrame();
- final FrameTupleAppender appender = new FrameTupleAppender(
- ctx.getFrameSize());
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(outFrame, true);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private PreclusteredGroupWriter pgw;
@Override
public void open() throws HyracksDataException {
- pgw = new PreclusteredGroupWriter(ctx, groupFields,
- comparators, aggregator, inRecordDesc, recordDescriptors[0], writer);
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+ recordDescriptors[0], writer);
pgw.open();
}
@Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
pgw.nextFrame(buffer);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index ee8489a..7e4baac 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -27,157 +27,138 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class PreclusteredGroupWriter implements IFrameWriter {
- private final int[] groupFields;
- private final IBinaryComparator[] comparators;
- private final IAggregatorDescriptor aggregator;
- private final AggregateState aggregateState;
- private final IFrameWriter writer;
- private final ByteBuffer copyFrame;
- private final FrameTupleAccessor inFrameAccessor;
- private final FrameTupleAccessor copyFrameAccessor;
+ private final int[] groupFields;
+ private final IBinaryComparator[] comparators;
+ private final IAggregatorDescriptor aggregator;
+ private final AggregateState aggregateState;
+ private final IFrameWriter writer;
+ private final ByteBuffer copyFrame;
+ private final FrameTupleAccessor inFrameAccessor;
+ private final FrameTupleAccessor copyFrameAccessor;
- private final ByteBuffer outFrame;
- private final FrameTupleAppender appender;
- private final ArrayTupleBuilder tupleBuilder;
+ private final ByteBuffer outFrame;
+ private final FrameTupleAppender appender;
+ private final ArrayTupleBuilder tupleBuilder;
- private boolean first;
+ private boolean first;
- public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
- IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
- RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
- IFrameWriter writer) {
- this.groupFields = groupFields;
- this.comparators = comparators;
- this.aggregator = aggregator;
- this.aggregateState = aggregator.createAggregateStates();
- this.writer = writer;
- copyFrame = ctx.allocateFrame();
- inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- inRecordDesc);
- copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
+ public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+ IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
+ IFrameWriter writer) {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.aggregator = aggregator;
+ this.aggregateState = aggregator.createAggregateStates();
+ this.writer = writer;
+ copyFrame = ctx.allocateFrame();
+ inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
- outFrame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(outFrame, true);
+ outFrame = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
- tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
- }
+ tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+ }
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- first = true;
- }
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ first = true;
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inFrameAccessor.reset(buffer);
- int nTuples = inFrameAccessor.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- if (first) {
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int nTuples = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
- tupleBuilder.reset();
- for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(inFrameAccessor, i, j);
- }
- aggregator.init(tupleBuilder, inFrameAccessor, i,
- aggregateState);
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(inFrameAccessor, i, j);
+ }
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
- first = false;
+ first = false;
- } else {
- if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor,
- copyFrameAccessor.getTupleCount() - 1,
- inFrameAccessor, i);
- } else {
- switchGroupIfRequired(inFrameAccessor, i - 1,
- inFrameAccessor, i);
- }
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
- }
- }
- FrameUtils.copy(buffer, copyFrame);
- }
+ }
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor,
- int prevTupleIndex, FrameTupleAccessor currTupleAccessor,
- int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor,
- currTupleIndex)) {
- writeOutput(prevTupleAccessor, prevTupleIndex);
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
- tupleBuilder.reset();
- for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(currTupleAccessor, currTupleIndex, j);
- }
- aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex,
- aggregateState);
- } else {
- aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
- aggregateState);
- }
- }
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(currTupleAccessor, currTupleIndex, j);
+ }
+ aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+ } else {
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+ }
+ }
- private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
- int lastTupleIndex) throws HyracksDataException {
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+ throws HyracksDataException {
- tupleBuilder.reset();
- for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
- }
- aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
- lastTupleIndex, aggregateState);
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
+ }
+ aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
- if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.appendSkipEmptyField(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- throw new HyracksDataException(
- "The output cannot be fit into a frame.");
- }
- }
+ if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
- }
+ }
- private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
- FrameTupleAccessor a2, int t2Idx) {
- for (int i = 0; i < comparators.length; ++i) {
- int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
- + a1.getFieldStartOffset(t1Idx, fIdx);
- int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
- + a2.getFieldStartOffset(t2Idx, fIdx);
- int l2 = a2.getFieldLength(t2Idx, fIdx);
- if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2
- .getBuffer().array(), s2, l2) != 0) {
- return false;
- }
- }
- return true;
- }
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
- @Override
- public void close() throws HyracksDataException {
- if (!first) {
- writeOutput(copyFrameAccessor,
- copyFrameAccessor.getTupleCount() - 1);
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
- aggregateState.close();
- writer.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+ aggregateState.close();
+ writer.close();
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 66eed9d..2e781b5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -31,41 +31,46 @@
*
*/
public class AvgFieldGroupAggregatorFactory implements IFieldAggregateDescriptorFactory {
-
+
private static final long serialVersionUID = 1L;
-
+
private final int aggField;
-
+
private final boolean useObjectState;
-
- public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState){
+
+ public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState) {
this.aggField = aggField;
this.useObjectState = useObjectState;
}
-
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+ * IFieldAggregateDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-
+
return new IFieldAggregateDescriptor() {
-
+
@Override
public void reset() {
}
-
+
@Override
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
@@ -73,74 +78,65 @@
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
try {
- fieldOutput.writeFloat((float)sum/count);
+ fieldOutput.writeFloat((float) sum / count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
count += 1;
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
}
} else {
- state.state = new Integer[]{sum, count};
+ state.state = new Integer[] { sum, count };
}
}
-
+
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
int sum = 0, count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
count += 1;
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
@@ -149,10 +145,10 @@
buf.putInt(offset, sum);
buf.putInt(offset + 4, count);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum += fields[0];
count += fields[1];
- state.state = new Integer[]{sum, count};
+ state.state = new Integer[] { sum, count };
}
}
@@ -160,15 +156,15 @@
public boolean needsObjectState() {
return useObjectState;
}
-
+
@Override
public boolean needsBinaryState() {
return !useObjectState;
}
-
+
@Override
public AggregateState createState() {
- return new AggregateState(new Integer[]{0, 0});
+ return new AggregateState(new Integer[] { 0, 0 });
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index cc30641..cc5c1e1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -30,43 +30,47 @@
/**
*
*/
-public class AvgFieldMergeAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
+public class AvgFieldMergeAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
-
+
private final int aggField;
-
+
private final boolean useObjectState;
-
+
public AvgFieldMergeAggregatorFactory(int aggField, boolean useObjectState) {
this.aggField = aggField;
this.useObjectState = useObjectState;
}
-
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+ * IFieldAggregateDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-
+
return new IFieldAggregateDescriptor() {
-
+
@Override
public void reset() {
}
-
+
@Override
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
@@ -74,48 +78,43 @@
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
int sum, count;
if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum = fields[0];
count = fields[1];
}
try {
- fieldOutput.writeFloat((float)sum/count);
+ fieldOutput.writeFloat((float) sum / count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
int sum = 0, count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
count += 1;
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
@@ -124,10 +123,10 @@
buf.putInt(offset, sum);
buf.putInt(offset + 4, count);
} else {
- Integer[] fields = (Integer[])state.state;
+ Integer[] fields = (Integer[]) state.state;
sum += fields[0];
count += fields[1];
- state.state = new Integer[]{sum, count};
+ state.state = new Integer[] { sum, count };
}
}
@@ -135,47 +134,40 @@
public boolean needsObjectState() {
return useObjectState;
}
-
+
@Override
public boolean needsBinaryState() {
return !useObjectState;
}
-
+
@Override
public AggregateState createState() {
- return new AggregateState(new Integer[]{0, 0});
+ return new AggregateState(new Integer[] { 0, 0 });
}
-
+
@Override
- public void init(IFrameTupleAccessor accessor,
- int tIndex, DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart + 4);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
}
} else {
- state.state = new Integer[]{sum, count};
+ state.state = new Integer[] { sum, count };
}
}
};
}
}
-
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index e320603..9bfec8e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -30,8 +30,7 @@
/**
*
*/
-public class CountFieldAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
+public class CountFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -51,8 +50,7 @@
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
@@ -61,8 +59,7 @@
}
@Override
- public void outputPartialResult(DataOutput fieldOutput,
- byte[] data, int offset, AggregateState state)
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int count;
if (!useObjectState) {
@@ -73,14 +70,12 @@
try {
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state)
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int count;
if (!useObjectState) {
@@ -91,22 +86,19 @@
try {
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int count = 1;
if (!useObjectState) {
try {
fieldOutput.writeInt(count);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
}
} else {
state.state = count;
@@ -131,9 +123,8 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
int count = 1;
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index a3c0aa9..7d85deb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -30,8 +30,7 @@
/**
*
*/
-public class IntSumFieldAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
+public class IntSumFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -54,8 +53,7 @@
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
@@ -65,8 +63,7 @@
}
@Override
- public void outputPartialResult(DataOutput fieldOutput,
- byte[] data, int offset, AggregateState state)
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum;
if (!useObjectState) {
@@ -77,14 +74,12 @@
try {
fieldOutput.writeInt(sum);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state)
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum;
if (!useObjectState) {
@@ -95,31 +90,26 @@
try {
fieldOutput.writeInt(sum);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
} catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
}
} else {
state.state = sum;
@@ -144,16 +134,13 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 9c6ff68..94ebcbd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -33,8 +33,7 @@
/**
*
*/
-public class MinMaxStringFieldAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
+public class MinMaxStringFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -44,8 +43,7 @@
private final boolean hasBinaryState;
- public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax,
- boolean hasBinaryState) {
+ public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
this.aggField = aggField;
this.isMax = isMax;
this.hasBinaryState = hasBinaryState;
@@ -61,8 +59,7 @@
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
@@ -71,13 +68,11 @@
}
@Override
- public void outputPartialResult(DataOutput fieldOutput,
- byte[] data, int offset, AggregateState state)
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(
- data, offset);
+ int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
Object[] storedState = (Object[]) state.state;
fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
@@ -90,13 +85,11 @@
}
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state)
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(
- data, offset);
+ int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
Object[] storedState = (Object[]) state.state;
fieldOutput.writeUTF((String) storedState[stateIdx]);
} else {
@@ -109,18 +102,14 @@
}
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
int fieldLength = accessor.getFieldLength(tIndex, aggField);
- String strField = UTF8StringSerializerDeserializer.INSTANCE
- .deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor.getBuffer()
- .array(), tupleOffset
- + accessor.getFieldSlotsLength()
- + fieldStart, fieldLength)));
+ String strField = UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
if (hasBinaryState) {
// Object-binary-state
Object[] storedState;
@@ -133,8 +122,7 @@
}
int stateCount = (Integer) (storedState[0]);
if (stateCount + 1 >= storedState.length) {
- storedState = Arrays.copyOf(storedState,
- storedState.length * 2);
+ storedState = Arrays.copyOf(storedState, storedState.length * 2);
state.state = storedState;
}
@@ -159,44 +147,35 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
int fieldLength = accessor.getFieldLength(tIndex, aggField);
- String strField = UTF8StringSerializerDeserializer.INSTANCE
- .deserialize(new DataInputStream(
- new ByteArrayInputStream(accessor.getBuffer()
- .array(), tupleOffset
- + accessor.getFieldSlotsLength()
- + fieldStart, fieldLength)));
+ String strField = UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(data,
- offset);
+ int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
Object[] storedState = (Object[]) state.state;
if (isMax) {
- if (strField.length() > ((String) (storedState[stateIdx]))
- .length()) {
+ if (strField.length() > ((String) (storedState[stateIdx])).length()) {
storedState[stateIdx] = strField;
}
} else {
- if (strField.length() < ((String) (storedState[stateIdx]))
- .length()) {
+ if (strField.length() < ((String) (storedState[stateIdx])).length()) {
storedState[stateIdx] = strField;
}
}
} else {
if (isMax) {
- if (strField.length() > ((String) (state.state))
- .length()) {
+ if (strField.length() > ((String) (state.state)).length()) {
state.state = strField;
}
} else {
- if (strField.length() < ((String) (state.state))
- .length()) {
+ if (strField.length() < ((String) (state.state)).length()) {
state.state = strField;
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 81eb45f..5c6e94d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -30,21 +30,18 @@
/**
*
*/
-public class MultiFieldsAggregatorFactory implements
- IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
private int[] keys;
- public MultiFieldsAggregatorFactory(int[] keys,
- IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+ public MultiFieldsAggregatorFactory(int[] keys, IFieldAggregateDescriptorFactory[] aggregatorFactories) {
this.keys = keys;
this.aggregatorFactories = aggregatorFactories;
}
-
- public MultiFieldsAggregatorFactory(
- IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+
+ public MultiFieldsAggregatorFactory(IFieldAggregateDescriptorFactory[] aggregatorFactories) {
this.aggregatorFactories = aggregatorFactories;
}
@@ -58,21 +55,18 @@
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields,
- final int[] keyFieldsInPartialResults) throws HyracksDataException {
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
- aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor);
+ aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor);
}
-
- if(this.keys == null){
+
+ if (this.keys == null) {
this.keys = keyFields;
}
-
return new IAggregatorDescriptor() {
@@ -84,41 +78,34 @@
}
@Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
tupleBuilder.reset();
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- tupleBuilder.addField(accessor, tIndex,
- keyFieldsInPartialResults[i]);
+ tupleBuilder.addField(accessor, tIndex, keyFieldsInPartialResults[i]);
}
DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
- int fieldOffset = accessor.getFieldStartOffset(tIndex,
- keys.length + i);
- aggregators[i].outputPartialResult(dos, accessor
- .getBuffer().array(),
- fieldOffset + accessor.getFieldSlotsLength()
- + tupleOffset, ((AggregateState[]) state
- .state)[i]);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
+ aggregators[i].outputPartialResult(dos, accessor.getBuffer().array(),
+ fieldOffset + accessor.getFieldSlotsLength() + tupleOffset,
+ ((AggregateState[]) state.state)[i]);
tupleBuilder.addFieldEndOffset();
}
}
@Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
tupleBuilder.reset();
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- tupleBuilder.addField(accessor, tIndex,
- keyFieldsInPartialResults[i]);
+ tupleBuilder.addField(accessor, tIndex, keyFieldsInPartialResults[i]);
}
DataOutput dos = tupleBuilder.getDataOutput();
@@ -126,26 +113,21 @@
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i].needsBinaryState()) {
- int fieldOffset = accessor.getFieldStartOffset(tIndex,
- keys.length + i);
- aggregators[i].outputFinalResult(dos, accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldOffset,
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
+ aggregators[i].outputFinalResult(dos, accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldOffset,
((AggregateState[]) state.state)[i]);
} else {
- aggregators[i].outputFinalResult(dos, null, 0,
- ((AggregateState[]) state.state)[i]);
+ aggregators[i].outputFinalResult(dos, null, 0, ((AggregateState[]) state.state)[i]);
}
tupleBuilder.addFieldEndOffset();
}
}
@Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
-
+
tupleBuilder.reset();
for (int i = 0; i < keys.length; i++) {
tupleBuilder.addField(accessor, tIndex, keys[i]);
@@ -153,8 +135,7 @@
DataOutput dos = tupleBuilder.getDataOutput();
for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].init(accessor, tIndex, dos,
- ((AggregateState[]) state.state)[i]);
+ aggregators[i].init(accessor, tIndex, dos, ((AggregateState[]) state.state)[i]);
if (aggregators[i].needsBinaryState()) {
tupleBuilder.addFieldEndOffset();
}
@@ -178,37 +159,26 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
if (stateAccessor != null) {
- int stateTupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
+ int stateTupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
int fieldIndex = 0;
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i].needsBinaryState()) {
- int stateFieldOffset = stateAccessor
- .getFieldStartOffset(stateTupleIndex,
- keys.length + fieldIndex);
- aggregators[i].aggregate(
- accessor,
- tIndex,
- stateAccessor.getBuffer().array(),
- stateTupleOffset
- + stateAccessor
- .getFieldSlotsLength()
- + stateFieldOffset,
+ int stateFieldOffset = stateAccessor.getFieldStartOffset(stateTupleIndex, keys.length
+ + fieldIndex);
+ aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
+ stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
((AggregateState[]) state.state)[i]);
fieldIndex++;
} else {
- aggregators[i].aggregate(accessor, tIndex, null, 0,
- ((AggregateState[]) state.state)[i]);
+ aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
}
}
} else {
for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].aggregate(accessor, tIndex, null, 0,
- ((AggregateState[]) state.state)[i]);
+ aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
}
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index eb58cb9..84caadf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -72,8 +72,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index b0e3dbe..85a6912 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -40,8 +40,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) throws HyracksDataException {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new ConstantTupleSourceOperatorNodePushable(ctx, fieldSlots, tupleData, tupleSize);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index b6a96e3..8e0ac79 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -32,8 +32,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new AbstractUnaryInputSinkOperatorNodePushable() {
@Override
public void open() throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 2d4a4d0..7effbb0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -61,8 +61,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 5d62607..6874f77 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -24,8 +24,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index e7fb9ac..3e99000 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -72,8 +72,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private CollectTaskState state;
@@ -117,8 +117,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private IOpenableDataWriter<Object[]> writer;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index fcf4978..f55eb3d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -55,8 +55,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions)
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
return new UnionOperator(ctx, inRecordDesc);