merge with fullstack_hybridhashgby r2861
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2864 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 974ef3b..f2b56fa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -255,7 +255,8 @@
}
outputAppender.reset(outputFrame, true);
- //writer.open();
+
+ writer.open();
if (tPointers == null) {
// Not sorted
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
index 57a364b..6e85cff 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
@@ -279,7 +279,6 @@
*/
public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
- // FIXME: remove this
outputAppender.reset(outputBuffer, true);
for (int i = 0; i < contents.length; i++) {
if (contents[i] == null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
index 8695e0b..a846e4a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
@@ -66,8 +66,7 @@
}
public void process() throws HyracksDataException {
-
- writer.open();
+
// FIXME
int mergeLevels = 0, mergeRunCount = 0;
try {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
index d147a53..b325b83 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
@@ -263,7 +263,7 @@
private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
- if (isPartitionOnly) {
+ if (isPartitionOnly) {
// for partition only
int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
insertSpilledPartition(accessor, tupleIndex, pid);
@@ -582,7 +582,7 @@
}
}
- public List<Integer> getSpilledRunsSizeInTuples() throws HyracksDataException {
+ public List<Integer> getSpilledRunsSizeInRawTuples() throws HyracksDataException {
return spilledPartRunSizesInTuples;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
index dcefc59..0cc718d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -97,10 +98,6 @@
private final boolean doInputAdjustment;
private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
-
-
-
- private int open_times ;
public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
@@ -111,7 +108,6 @@
this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
mergerFactory, outRecDesc, true);
- open_times = 0;
}
public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
@@ -153,8 +149,6 @@
recordDescriptors[0] = outRecDesc;
this.doInputAdjustment = doInputAdjustment;
-
- open_times = 0;
}
@Override
@@ -177,7 +171,8 @@
HybridHashGroupHashTable topProcessor;
- int observedInputSizeInFrames;
+ int observedInputSizeInRawTuples;
+ int observedInputSizeInFrames, maxRecursiveLevels;
int userProvidedInputSizeInFrames;
@@ -240,13 +235,11 @@
writer.open();
topProcessor.open();
-
- open_times += 1;
- System.err.println(open_times);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ observedInputSizeInRawTuples += buffer.getInt(buffer.capacity() - 4);
observedInputSizeInFrames++;
topProcessor.nextFrame(buffer);
}
@@ -259,65 +252,24 @@
@Override
public void close() throws HyracksDataException {
- topProcessor.finishup();
+ // estimate the maximum recursive levels
+ maxRecursiveLevels = (int) Math.max(
+ Math.ceil(Math.log(observedInputSizeInFrames * fudgeFactor) / Math.log(framesLimit)) + 1, 1);
- List<IFrameReader> runs = topProcessor.getSpilledRuns();
- List<Integer> runsSizeInFrames = topProcessor.getSpilledRunsSizeInPages();
-
- // get statistics from the hash table
- int hashedKeys = topProcessor.getHashedUniqueKeys();
- int hashedRawRecords = topProcessor.getHashedRawRecords();
-
- // Get the raw record size of each partition (in records but not in pages)
- List<Integer> partitionRawRecordsCount = topProcessor.getSpilledRunsSizeInTuples();
-
- topProcessor.close();
-
- // get a new estimation on the number of keys in the input data set: if the previous level is pure-partition,
- // then use the size inputed in the previous level; otherwise, compute the key ratio in the data set based on
- // the processed keys.
- int newKeySizeInPages = (doInputAdjustment && hashedRawRecords > 0) ? (int) Math
- .ceil((double) hashedKeys / hashedRawRecords * observedInputSizeInFrames) : (int) Math
- .ceil(userProvidedInputSizeInFrames);
-
- IFrameReader runReader;
- int runSizeInFrames;
- int partitionRawRecords;
-
- while (!runs.isEmpty()) {
-
- runReader = runs.remove(0);
- runSizeInFrames = runsSizeInFrames.remove(0);
- partitionRawRecords = partitionRawRecordsCount.remove(0);
-
- // compute the estimated key size in frames for the run file
- int runKeySize;
-
- if (doInputAdjustment && hashedRawRecords > 0)
- runKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInFrames
- / observedInputSizeInFrames);
- else
- runKeySize = (int) Math.ceil((double) userProvidedInputSizeInFrames * partitionRawRecords
- / inputSizeInRawRecords);
-
- if (topLevelFallbackCheck && runKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
- fallBack(runReader, runSizeInFrames, runKeySize, 1);
- } else {
- processRunFiles(runReader, runKeySize, 1);
- }
- }
+ finishAndRecursion(topProcessor, observedInputSizeInRawTuples, inputSizeInUniqueKeys, 0,
+ topLevelFallbackCheck);
writer.close();
}
- private void processRunFiles(IFrameReader runReader, int uniqueKeysOfRunFileInFrames, int runLevel)
+ private void processRunFiles(IFrameReader runReader, int inputCardinality, int runLevel)
throws HyracksDataException {
boolean checkFallback = true;
- int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, uniqueKeysOfRunFileInFrames,
- fudgeFactor);
+ int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, inputCardinality
+ * userProvidedRecordSizeInBytes / frameSize, fudgeFactor);
HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
@@ -328,70 +280,82 @@
runReader.open();
- int inputRunRawSizeInFrames = 0, inputRunRawSizeInTuples = 0;
+ int inputRunRawSizeInTuples = 0;
if (readAheadBuf == null) {
readAheadBuf = ctx.allocateFrame();
}
while (runReader.nextFrame(readAheadBuf)) {
- inputRunRawSizeInFrames++;
inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
processor.nextFrame(readAheadBuf);
}
runReader.close();
- processor.finishup();
+ finishAndRecursion(processor, inputRunRawSizeInTuples, inputCardinality, runLevel, checkFallback);
+ }
- List<IFrameReader> runs = processor.getSpilledRuns();
- List<Integer> runSizes = processor.getSpilledRunsSizeInPages();
- List<Integer> partitionRawRecords = processor.getSpilledRunsSizeInTuples();
+ /**
+ * Finish the hash table processing and start recursive processing on run files.
+ *
+ * @param ht
+ * @param inputRawRecordCount
+ * @param inputCardinality
+ * @param level
+ * @param checkFallback
+ * @throws HyracksDataException
+ */
+ private void finishAndRecursion(HybridHashGroupHashTable ht, long inputRawRecordCount,
+ long inputCardinality, int level, boolean checkFallback) throws HyracksDataException {
- int directFlushKeysInTuples = processor.getHashedUniqueKeys();
- int directFlushRawRecordsInTuples = processor.getHashedRawRecords();
+ ht.finishup();
- processor.close();
+ List<IFrameReader> generatedRunReaders = ht.getSpilledRuns();
+ List<Integer> partitionRawRecords = ht.getSpilledRunsSizeInRawTuples();
- int newKeySizeInPages = (doInputAdjustment && directFlushRawRecordsInTuples > 0) ? (int) Math
- .ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
- * inputRunRawSizeInFrames) : uniqueKeysOfRunFileInFrames;
+ int directFlushKeysInTuples = ht.getHashedUniqueKeys();
+ int directFlushRawRecordsInTuples = ht.getHashedRawRecords();
+
+ ht.close();
+ ht = null;
+
+ ctx.getCounterContext().getCounter("optional.levels." + level + ".estiInputKeyCardinality", true)
+ .update(inputCardinality);
+
+ // do adjustment
+ if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
+ inputCardinality = (int) Math.ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
+ * inputRawRecordCount);
+ }
+
+ ctx.getCounterContext()
+ .getCounter("optional.levels." + level + ".estiInputKeyCardinalityAdjusted", true)
+ .update(inputCardinality);
IFrameReader recurRunReader;
- int runSizeInPages, subPartitionRawRecords;
+ int subPartitionRawRecords;
- while (!runs.isEmpty()) {
- recurRunReader = runs.remove(0);
- runSizeInPages = runSizes.remove(0);
+ while (!generatedRunReaders.isEmpty()) {
+ recurRunReader = generatedRunReaders.remove(0);
subPartitionRawRecords = partitionRawRecords.remove(0);
- int newRunKeySize;
+ int runKeyCardinality = (int) Math.ceil((double) inputCardinality * subPartitionRawRecords
+ / inputRawRecordCount);
- if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
- // do adjustment
- newRunKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInPages
- / inputRunRawSizeInFrames);
+ if ((checkFallback && runKeyCardinality > HYBRID_FALLBACK_THRESHOLD * inputCardinality)
+ || level > maxRecursiveLevels) {
+ Logger.getLogger(HybridHashGroupOperatorDescriptor.class.getSimpleName()).warning(
+ "Hybrid-hash falls back to hash-sort algorithm! (" + level + ":" + maxRecursiveLevels
+ + ")");
+ fallback(recurRunReader, level);
} else {
- // no adjustment
- newRunKeySize = (int) Math.ceil((double) subPartitionRawRecords * uniqueKeysOfRunFileInFrames
- / inputRunRawSizeInTuples);
- }
-
- if (checkFallback && newRunKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
- fallBack(recurRunReader, runSizeInPages, newRunKeySize, runLevel);
- } else {
- processRunFiles(recurRunReader, newRunKeySize, runLevel + 1);
+ processRunFiles(recurRunReader, runKeyCardinality, level + 1);
}
}
}
- private void fallBack(IFrameReader recurRunReader, int runSizeInPages, int runKeySizeInPages, int runLevel)
- throws HyracksDataException {
- fallbackHashSortAlgorithm(recurRunReader, runLevel + 1);
- }
-
- private void fallbackHashSortAlgorithm(IFrameReader recurRunReader, int runLevel)
- throws HyracksDataException {
+ private void fallback(IFrameReader recurRunReader, int runLevel) throws HyracksDataException {
// fall back
FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
deleted file mode 100644
index 666c172..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-public class HybridHashPartitionGenerateFrameWriter implements IFrameWriter {
- private final IHyracksTaskContext ctx;
-
- private RunFileWriter[] partitions;
-
- private int[] partitionRawSizesInFrames;
- private int[] partitionRawSizesInTuples;
-
- private List<IFrameReader> partitionRunReaders;
- private List<Integer> partitionRunAggregatedPages;
- private List<Integer> partitionSizeInFrames;
- private List<Integer> partitionSizeInTuples;
-
- private ByteBuffer[] outputBuffers;
-
- private final int numOfPartitions;
-
- private final ITuplePartitionComputer tpc;
-
- private final FrameTupleAccessor inFrameTupleAccessor;
-
- private final FrameTupleAppender outFrameTupleAppender;
-
- public HybridHashPartitionGenerateFrameWriter(IHyracksTaskContext ctx, int numOfPartitions,
- ITuplePartitionComputer tpc, RecordDescriptor inRecDesc) {
- this.ctx = ctx;
- this.numOfPartitions = numOfPartitions;
- this.tpc = tpc;
- this.partitions = new RunFileWriter[numOfPartitions];
- this.outputBuffers = new ByteBuffer[numOfPartitions];
- this.partitionRawSizesInTuples = new int[numOfPartitions];
- this.partitionRawSizesInFrames = new int[numOfPartitions];
- this.inFrameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
- this.outFrameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#open()
- */
- @Override
- public void open() throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
- */
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inFrameTupleAccessor.reset(buffer);
- int tupleCount = inFrameTupleAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- int pid = tpc.partition(inFrameTupleAccessor, i, numOfPartitions);
-
- ctx.getCounterContext().getCounter("optional.partition.insert.count", true).update(1);
-
- if (outputBuffers[pid] == null) {
- outputBuffers[pid] = ctx.allocateFrame();
- }
- outFrameTupleAppender.reset(outputBuffers[pid], false);
-
- if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
- // flush the output buffer
- if (partitions[pid] == null) {
- partitions[pid] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
- HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
- partitions[pid].open();
- }
- FrameUtils.flushFrame(outputBuffers[pid], partitions[pid]);
- partitionRawSizesInFrames[pid]++;
- outFrameTupleAppender.reset(outputBuffers[pid], true);
- if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
- throw new HyracksDataException(
- "Failed to insert a record into its partition: the record size is too large. ");
- }
- }
- partitionRawSizesInTuples[pid]++;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#fail()
- */
- @Override
- public void fail() throws HyracksDataException {
- throw new HyracksDataException("Failed on hash partitioning.");
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#close()
- */
- @Override
- public void close() throws HyracksDataException {
- outputBuffers = null;
- for (RunFileWriter partWriter : partitions) {
- if (partWriter != null)
- partWriter.close();
- }
- }
-
- public void finishup() throws HyracksDataException {
- for (int i = 0; i < outputBuffers.length; i++) {
- if (outputBuffers[i] == null) {
- continue;
- }
- if (partitions[i] == null) {
- partitions[i] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
- HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
- partitions[i].open();
- }
- outFrameTupleAppender.reset(outputBuffers[i], false);
- if (outFrameTupleAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffers[i], partitions[i]);
- partitionRawSizesInFrames[i]++;
- outputBuffers[i] = null;
- }
- }
-
- partitionRunReaders = new LinkedList<IFrameReader>();
- partitionSizeInFrames = new LinkedList<Integer>();
- partitionSizeInTuples = new LinkedList<Integer>();
- partitionRunAggregatedPages = new LinkedList<Integer>();
-
- for (int i = 0; i < numOfPartitions; i++) {
- if (partitions[i] != null) {
- partitionRunReaders.add(partitions[i].createReader());
- partitionRunAggregatedPages.add(0);
- partitions[i].close();
- partitionSizeInFrames.add(partitionRawSizesInFrames[i]);
- partitionSizeInTuples.add(partitionRawSizesInTuples[i]);
- }
- }
-
- }
-
- public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
- return partitionRunReaders;
- }
-
- public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
- return partitionSizeInFrames;
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index d1fec29..fd4f8da 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,20 +29,26 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
public class PreclusteredGroupWriter implements IFrameWriter {
+
+ private final static int INT_SIZE = 4;
+
private final int[] groupFields;
private final IBinaryComparator[] comparators;
private final IAggregatorDescriptor aggregator;
private final AggregateState aggregateState;
private final IFrameWriter writer;
- private final ByteBuffer copyFrame;
private final FrameTupleAccessor inFrameAccessor;
- private final FrameTupleAccessor copyFrameAccessor;
private final ByteBuffer outFrame;
private final FrameTupleAppender appender;
private final ArrayTupleBuilder tupleBuilder;
- private boolean first;
+ private final RecordDescriptor outRecordDesc;
+
+ private byte[] groupResultCache;
+ private ByteBuffer groupResultCacheBuffer;
+ private FrameTupleAccessor groupResultCacheAccessor;
+ private FrameTupleAppender groupResultCacheAppender;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
@@ -52,10 +58,9 @@
this.aggregator = aggregator;
this.aggregateState = aggregator.createAggregateStates();
this.writer = writer;
- copyFrame = ctx.allocateFrame();
+ this.outRecordDesc = outRecordDesc;
+
inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor.reset(copyFrame);
outFrame = ctx.allocateFrame();
appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -67,7 +72,6 @@
@Override
public void open() throws HyracksDataException {
writer.open();
- first = true;
}
@Override
@@ -75,40 +79,45 @@
inFrameAccessor.reset(buffer);
int nTuples = inFrameAccessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
- if (first) {
- tupleBuilder.reset();
- for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
- }
- aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
-
- first = false;
-
- } else {
- if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+ groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
+ if (sameGroup(inFrameAccessor, i, groupResultCacheAccessor, 0)) {
+ // find match: do aggregation
+ aggregator.aggregate(inFrameAccessor, i, groupResultCacheAccessor, 0, aggregateState);
+ continue;
} else {
- switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ // write the cached group into the final output
+ writeOutput(groupResultCacheAccessor, 0);
}
-
}
- }
- FrameUtils.copy(buffer, copyFrame);
- }
-
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
- FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
- writeOutput(prevTupleAccessor, prevTupleIndex);
tupleBuilder.reset();
+
for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
+ tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
}
- aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
- } else {
- aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+ // enlarge the cache buffer if necessary
+ int requiredSize = tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * INT_SIZE + 2
+ * INT_SIZE;
+
+ if (groupResultCache == null || groupResultCache.length < requiredSize) {
+ groupResultCache = new byte[requiredSize];
+ groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
+ groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
+ groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, outRecordDesc);
+ }
+
+ groupResultCacheAppender.reset(groupResultCacheBuffer, true);
+ if (!groupResultCacheAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
+ }
+
+ groupResultCacheAccessor.reset(groupResultCacheBuffer);
}
}
@@ -117,7 +126,7 @@
tupleBuilder.reset();
for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
+ tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
}
aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
@@ -138,8 +147,8 @@
int fIdx = groupFields[i];
int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
- int l2 = a2.getFieldLength(t2Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, i);
+ int l2 = a2.getFieldLength(t2Idx, i);
if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
return false;
}
@@ -154,8 +163,8 @@
@Override
public void close() throws HyracksDataException {
- if (!first) {
- writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+ writeOutput(groupResultCacheAccessor, 0);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
}