clean up code formatting and comments
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@526 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
index 123fedf..c4d623c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
@@ -24,16 +24,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-/**
- * @author jarodwen
- */
public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
- /**
- *
- */
private static final long serialVersionUID = 1L;
-
private final int avgField;
private int outField = -1;
@@ -46,9 +38,6 @@
this.outField = outField;
}
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
- */
@Override
public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
@@ -120,7 +109,6 @@
@Override
public void close() {
- // TODO Auto-generated method stub
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
index ef6a139..90edf4a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -28,15 +28,9 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-/**
- * @author jarodwen
- */
public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
private static final long serialVersionUID = 1L;
-
private static final int INIT_ACCUMULATORS_SIZE = 8;
-
private final int concatField;
private int outField = -1;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
index 3a42fbc..c5f0a42 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -24,13 +24,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-/**
- * @author jarodwen
- */
public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
private static final long serialVersionUID = 1L;
-
private int outField = -1;
public CountAggregatorDescriptorFactory() {
@@ -40,9 +35,6 @@
this.outField = outField;
}
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
- */
@Override
public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
@@ -103,7 +95,6 @@
@Override
public void reset() {
- // TODO Auto-generated method stub
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
index 85a182f..e07b123 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -24,12 +24,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-/**
- * @author jarodwen
- */
public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
-
private final int aggField;
private int outField = -1;
@@ -115,8 +111,7 @@
@Override
public void reset() {
- // TODO Auto-generated method stub
-
+
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
index d30b2b2..58d0da7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -20,22 +20,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-/**
- * @author jarodwen
- */
public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-
private static final long serialVersionUID = 1L;
-
private final IAggregatorDescriptorFactory[] aggregatorFactories;
public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
this.aggregatorFactories = aggregatorFactories;
}
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
- */
@Override
public IAggregatorDescriptor createAggregator(final IHyracksStageletContext ctx,
final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index cdb0f4d..65f7f08 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -52,7 +52,6 @@
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
private static final long serialVersionUID = 1L;
/**
* The input frame identifier (in the job environment)
@@ -78,8 +77,10 @@
super(spec, 1, 1);
this.framesLimit = framesLimit;
if (framesLimit <= 1) {
- // Minimum of 2 frames: 1 for input records, and 1 for output
- // aggregation results.
+ /**
+ * Minimum of 2 frames: 1 for input records, and 1 for output
+ * aggregation results.
+ */
throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
}
@@ -91,9 +92,11 @@
this.spillableTableFactory = spillableTableFactory;
this.isOutputSorted = isOutputSorted;
- // Set the record descriptor. Note that since
- // this operator is a unary operator,
- // only the first record descriptor is used here.
+ /**
+ * Set the record descriptor. Note that since
+ * this operator is a unary operator,
+ * only the first record descriptor is used here.
+ */
recordDescriptors[0] = recordDescriptor;
}
@@ -148,8 +151,10 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
- // If the group table is too large, flush the table into
- // a run file.
+ /**
+ * If the group table is too large, flush the table into
+ * a run file.
+ */
if (!gTable.insert(accessor, i)) {
flushFramesToRun();
if (!gTable.insert(accessor, i))
@@ -168,10 +173,14 @@
public void close() throws HyracksDataException {
if (gTable.getFrameCount() >= 0) {
if (runs.size() <= 0) {
- // All in memory
+ /**
+ * All in memory
+ */
env.set(GROUPTABLES, gTable);
} else {
- // flush the memory into the run file.
+ /**
+ * flush the memory into the run file.
+ */
flushFramesToRun();
gTable.close();
}
@@ -225,11 +234,15 @@
final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
recordDescriptors[0], recordDescriptors[0], keyFields);
final int[] storedKeys = new int[keyFields.length];
- // Get the list of the fields in the stored records.
+ /**
+ * Get the list of the fields in the stored records.
+ */
for (int i = 0; i < keyFields.length; ++i) {
storedKeys[i] = i;
}
- // Tuple builder
+ /**
+ * Tuple builder
+ */
final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -320,19 +333,20 @@
try {
currentFrameIndexInRun = new int[runNumber];
currentRunFrames = new int[runNumber];
- // Create file readers for each input run file, only
- // for the ones fit into the inFrames
+ /**
+ * Create file readers for each input run file, only
+ * for the ones fit into the inFrames
+ */
RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- // Create input frame accessor
FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- // Build a priority queue for extracting tuples in order
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-
ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
recordDescriptors[0], runNumber, comparator);
- // Maintain a list of visiting index for all runs'
- // current frame
+ /**
+ * current tuple index in each run
+ */
int[] tupleIndices = new int[runNumber];
+
for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
tupleIndices[runIndex] = 0;
// Load the run file
@@ -357,9 +371,13 @@
}
}
- // Start merging
+ /**
+ * Start merging
+ */
while (!topTuples.areRunsExhausted()) {
- // Get the top record
+ /**
+ * Get the top record
+ */
ReferenceEntry top = topTuples.peek();
int tupleIndex = top.getTupleIndex();
int runIndex = topTuples.peek().getRunid();
@@ -368,8 +386,10 @@
int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
if (currentTupleInOutFrame < 0
|| compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
- // Initialize the first output record
- // Reset the tuple builder
+ /**
+ * Initialize the first output record
+ * Reset the tuple builder
+ */
tupleBuilder.reset();
for (int i = 0; i < keyFields.length; i++) {
tupleBuilder.addField(fta, tupleIndex, i);
@@ -378,9 +398,6 @@
currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- // Make sure that when the outFrame is being
- // flushed, all results in it are in
- // the correct state
flushOutFrame(writer, finalPass);
if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
@@ -388,9 +405,11 @@
"Failed to append an aggregation result to the output frame.");
}
} else {
- // if new tuple is in the same group of the
- // current aggregator
- // do merge and output to the outFrame
+ /**
+ * if new tuple is in the same group of the
+ * current aggregator
+ * do merge and output to the outFrame
+ */
int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
keyFields.length);
@@ -403,15 +422,17 @@
tupleIndices[runIndex]++;
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
}
- // Flush the outFrame
+
if (outFrameAppender.getTupleCount() > 0) {
flushOutFrame(writer, finalPass);
}
- // After processing all records, flush the aggregator
+
currentWorkingAggregator.close();
runs.subList(0, runNumber).clear();
- // insert the new run file into the beginning of the run
- // file list
+ /**
+ * insert the new run file into the beginning of the run
+ * file list
+ */
if (!finalPass) {
runs.add(0, ((RunFileWriter) writer).createReader());
}
@@ -467,10 +488,14 @@
int runStart = runIndex * runFrameLimit;
boolean existNext = false;
if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
- // run already closed
+ /**
+ * run already closed
+ */
existNext = false;
} else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
- // not the last frame for this run
+ /**
+ * not the last frame for this run
+ */
existNext = true;
if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
tupleIndices[runIndex] = 0;
@@ -478,7 +503,9 @@
}
} else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
.getTupleCount()) {
- // the last frame has expired
+ /**
+ * the last frame has expired
+ */
existNext = true;
} else {
/**
@@ -507,8 +534,7 @@
}
}
}
- // Check whether the run file for the given runIndex has
- // more tuples
+
if (existNext) {
topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
tupleIndices[runIndex]);
@@ -539,10 +565,6 @@
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
- // Note: Since the comparison is only used in the merge
- // phase,
- // all the keys are clustered at the beginning of the
- // tuple.
int fIdx = f;
int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
@@ -575,10 +597,6 @@
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
- // Note: Since the comparison is only used in the merge
- // phase,
- // all the keys are clustered at the beginning of the
- // tuple.
int fIdx = f;
int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index a8b61fc..3cc2fad 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -42,7 +42,6 @@
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
-
private static final long serialVersionUID = 1L;
private final ITuplePartitionComputerFactory tpcf;
private final int tableSize;
@@ -152,8 +151,10 @@
} while (true);
if (!foundGroup) {
- // If no matching group is found, create a new aggregator
- // Create a tuple for the new group
+ /**
+ * If no matching group is found, create a new aggregator
+ * Create a tuple for the new group
+ */
internalTupleBuilder.reset();
for (int i = 0; i < keyFields.length; i++) {
internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
@@ -334,7 +335,9 @@
offset++;
} while (true);
}
- // Sort using quick sort
+ /**
+ * Sort using quick sort
+ */
if (tPointers.length > 0) {
sort(tPointers, 0, totalTCount);
}
@@ -342,11 +345,10 @@
private void sort(int[] tPointers, int offset, int length) {
int m = offset + (length >> 1);
- // Get table index
int mTable = tPointers[m * 3];
int mRow = tPointers[m * 3 + 1];
int mNormKey = tPointers[m * 3 + 2];
- // Get frame and tuple index
+
table.getTuplePointer(mTable, mRow, storedTuplePointer);
int mFrame = storedTuplePointer.frameIndex;
int mTuple = storedTuplePointer.tupleIndex;