[ASTERIXDB-2552][RT] Refactor runs generator and merger
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Moved the writer and sorter out of the merger to allow
micro external sort to use the run generator and merger
plus minor clean-ups.
Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3363
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 364f1c7..cb2da4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
/**
* Group-by aggregation is pushed into multi-pass merge of external sort.
@@ -44,28 +43,23 @@
private final RecordDescriptor inputRecordDesc;
private final RecordDescriptor partialAggRecordDesc;
private final RecordDescriptor outRecordDesc;
-
private final int[] groupFields;
private final IAggregatorDescriptorFactory mergeAggregatorFactory;
private final IAggregatorDescriptorFactory partialAggregatorFactory;
private final boolean localSide;
-
private final int[] mergeSortFields;
private final int[] mergeGroupFields;
private final IBinaryComparator[] groupByComparators;
- public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<GeneratedRunFileReader> runs,
- int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
- RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
- INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
+ public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields,
+ RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc, RecordDescriptor outRecordDesc,
+ int framesLimit, int[] groupFields, INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
boolean localStage) {
- super(ctx, frameSorter, runs, comparators, nmk, partialAggRecordDesc, framesLimit, writer);
-
+ super(ctx, runs, comparators, nmk, partialAggRecordDesc, framesLimit);
this.inputRecordDesc = inRecordDesc;
this.partialAggRecordDesc = partialAggRecordDesc;
this.outRecordDesc = outRecordDesc;
-
this.groupFields = groupFields;
this.mergeAggregatorFactory = aggregatorFactory;
this.partialAggregatorFactory = partialAggregatorFactory;
@@ -93,11 +87,10 @@
}
@Override
- protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
- boolean outputPartial = false;
return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregatorFactory, inputRecordDesc,
- outRecordDesc, nextWriter, outputPartial);
+ outRecordDesc, nextWriter, false);
}
@Override
@@ -110,16 +103,14 @@
protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
throws HyracksDataException {
IAggregatorDescriptorFactory aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
- boolean outputPartial = true;
return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
- partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, outputPartial);
+ partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, true);
}
@Override
- protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
- boolean outputPartial = false;
+ public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, mergeAggregatorFactory,
- partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial);
+ partialAggRecordDesc, outRecordDesc, nextWriter, false);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 23e47f0..7ca01a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -37,7 +36,6 @@
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
/**
* This Operator pushes group-by aggregation into the external sort.
@@ -158,13 +156,12 @@
@Override
protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortGroupByRunMerger(ctx, sorter, runs, sortFields,
+ IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+ return new ExternalSortGroupByRunMerger(ctx, runs, sortFields,
recordDescProvider.getInputRecordDescriptor(new ActivityId(odId, SORT_ACTIVITY_ID), 0),
- partialAggRecordDesc, outputRecordDesc, necessaryFrames, writer, groupFields, nmkComputer,
- comparators, partialAggregatorFactory, mergeAggregatorFactory, !finalStage);
+ partialAggRecordDesc, outputRecordDesc, necessaryFrames, groupFields, nmkComputer, comparators,
+ partialAggregatorFactory, mergeAggregatorFactory, !finalStage);
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 0bead97..e860288 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -43,8 +43,6 @@
public abstract class AbstractExternalSortRunMerger {
protected final IHyracksTaskContext ctx;
- protected final IFrameWriter writer;
-
private final List<GeneratedRunFileReader> runs;
private final BitSet currentGenerationRunAvailable;
private final IBinaryComparator[] comparators;
@@ -54,136 +52,93 @@
private final int topK;
private List<GroupVSizeFrame> inFrames;
private VSizeFrame outputFrame;
- private ISorter sorter;
-
private static final Logger LOGGER = LogManager.getLogger();
- public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
+ public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
- int framesLimit, IFrameWriter writer) {
- this(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE, writer);
+ int framesLimit) {
+ this(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE);
}
- public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
+ AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
- int framesLimit, int topK, IFrameWriter writer) {
+ int framesLimit, int topK) {
this.ctx = ctx;
- this.sorter = sorter;
this.runs = new LinkedList<>(runs);
this.currentGenerationRunAvailable = new BitSet(runs.size());
this.comparators = comparators;
this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
- this.writer = writer;
this.topK = topK;
}
- public void process() throws HyracksDataException {
- IFrameWriter finalWriter = null;
+ public void process(IFrameWriter finalWriter) throws HyracksDataException {
try {
- if (runs.isEmpty()) {
- finalWriter = prepareSkipMergingFinalResultWriter(writer);
- finalWriter.open();
- if (sorter != null) {
- try {
- if (sorter.hasRemaining()) {
- sorter.flush(finalWriter);
- }
- } finally {
- sorter.close();
- }
- }
- } else {
- /** recycle sort buffer */
- if (sorter != null) {
- sorter.close();
- }
+ int maxMergeWidth = framesLimit - 1;
+ inFrames = new ArrayList<>(maxMergeWidth);
+ outputFrame = new VSizeFrame(ctx);
+ List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth);
+ int stop = runs.size();
+ currentGenerationRunAvailable.set(0, stop);
+ int numberOfPasses = 1;
+ while (true) {
+ int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
+ currentGenerationRunAvailable, stop);
+ prepareFrames(unUsed, inFrames, partialRuns);
- finalWriter = prepareFinalMergeResultWriter(writer);
- finalWriter.open();
-
- int maxMergeWidth = framesLimit - 1;
-
- inFrames = new ArrayList<>(maxMergeWidth);
- outputFrame = new VSizeFrame(ctx);
- List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth);
-
- int stop = runs.size();
- currentGenerationRunAvailable.set(0, stop);
- int numberOfPasses = 1;
- while (true) {
-
- int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
- currentGenerationRunAvailable, stop);
- prepareFrames(unUsed, inFrames, partialRuns);
-
- if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
- GeneratedRunFileReader reader;
- if (partialRuns.size() == 1) {
- if (!currentGenerationRunAvailable.isEmpty()) {
- throw new HyracksDataException(
- "The record is too big to put into the merging frame, please"
- + " allocate more sorting memory");
- } else {
- reader = partialRuns.get(0);
- }
-
+ if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
+ GeneratedRunFileReader reader;
+ if (partialRuns.size() == 1) {
+ if (!currentGenerationRunAvailable.isEmpty()) {
+ throw new HyracksDataException("The record is too big to put into the merging frame, please"
+ + " allocate more sorting memory");
} else {
- RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
- IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
-
- try {
- mergeResultWriter.open();
- merge(mergeResultWriter, partialRuns);
- } catch (Throwable t) {
- mergeResultWriter.fail();
- throw t;
- } finally {
- mergeResultWriter.close();
- }
- reader = mergeFileWriter.createReader();
- }
- runs.add(reader);
-
- if (currentGenerationRunAvailable.isEmpty()) {
- numberOfPasses++;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("generated runs:" + stop);
- }
- runs.subList(0, stop).clear();
- currentGenerationRunAvailable.clear();
- currentGenerationRunAvailable.set(0, runs.size());
- stop = runs.size();
+ reader = partialRuns.get(0);
}
} else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("final runs: {}", stop);
- LOGGER.debug("number of passes: " + numberOfPasses);
+ RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
+ IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
+
+ try {
+ mergeResultWriter.open();
+ merge(mergeResultWriter, partialRuns);
+ } catch (Throwable t) {
+ mergeResultWriter.fail();
+ throw t;
+ } finally {
+ mergeResultWriter.close();
}
- merge(finalWriter, partialRuns);
- break;
+ reader = mergeFileWriter.createReader();
}
- }
- }
- } catch (Exception e) {
- if (finalWriter != null) {
- finalWriter.fail();
- }
- throw HyracksDataException.create(e);
- } finally {
- try {
- if (finalWriter != null) {
- finalWriter.close();
- }
- } finally {
- for (RunFileReader reader : runs) {
- try {
- reader.close(); // close is idempotent.
- } catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, e.getMessage(), e);
+ runs.add(reader);
+
+ if (currentGenerationRunAvailable.isEmpty()) {
+ numberOfPasses++;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("generated runs:" + stop);
}
+ runs.subList(0, stop).clear();
+ currentGenerationRunAvailable.clear();
+ currentGenerationRunAvailable.set(0, runs.size());
+ stop = runs.size();
+ }
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("final runs: {}", stop);
+ LOGGER.debug("number of passes: " + numberOfPasses);
+ }
+ merge(finalWriter, partialRuns);
+ break;
+ }
+ }
+ } finally {
+ for (RunFileReader reader : runs) {
+ try {
+ reader.close(); // close is idempotent.
+ } catch (Exception e) {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.log(Level.WARN, e.getMessage(), e);
}
}
}
@@ -237,18 +192,6 @@
}
}
- protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
- throws HyracksDataException;
-
- protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
-
- protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
- throws HyracksDataException;
-
- protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
-
- protected abstract int[] getSortFields();
-
private void merge(IFrameWriter writer, List<GeneratedRunFileReader> partialRuns) throws HyracksDataException {
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(),
comparators, nmkComputer, recordDesc, topK);
@@ -267,4 +210,16 @@
}
}
+ public abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+ throws HyracksDataException;
+
+ protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
+
+ protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+ throws HyracksDataException;
+
+ public abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
+
+ protected abstract int[] getSortFields();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 980ad9b..74223d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -79,14 +79,6 @@
private final BufferInfo info = new BufferInfo(null, -1, -1);
public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
- int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
- throws HyracksDataException {
- this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
- recordDescriptor, Integer.MAX_VALUE);
- }
-
- public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
throws HyracksDataException {
@@ -286,23 +278,10 @@
return 0;
}
- protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
- System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
- System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
- System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
- }
-
- protected void copy(int src[], int srcPos, int dest[], int destPos) {
- System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
- }
-
- protected void copy(int src[], int srcPos, int dest[], int destPos, int n) {
- System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
- }
-
@Override
public void close() {
tupleCount = 0;
+ totalMemoryUsed = 0;
bufferManager.close();
tPointers = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index 3c11669..3f0b7c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -28,13 +28,18 @@
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
public abstract class AbstractSortRunGenerator implements IRunGenerator {
- protected final List<GeneratedRunFileReader> generatedRunFileReaders;
+
+ private final List<GeneratedRunFileReader> generatedRunFileReaders;
public AbstractSortRunGenerator() {
generatedRunFileReaders = new LinkedList<>();
}
- abstract public ISorter getSorter() throws HyracksDataException;
+ /**
+ * Null could be returned. Caller should check if it not null.
+ * @return the sorter associated with the run generator or null if there is no sorter.
+ */
+ abstract public ISorter getSorter();
@Override
public void open() throws HyracksDataException {
@@ -43,9 +48,10 @@
@Override
public void close() throws HyracksDataException {
- if (getSorter().hasRemaining()) {
+ ISorter sorter = getSorter();
+ if (sorter != null && sorter.hasRemaining()) {
if (generatedRunFileReaders.size() <= 0) {
- getSorter().sort();
+ sorter.sort();
} else {
flushFramesToRun();
}
@@ -56,13 +62,15 @@
abstract protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException;
- protected void flushFramesToRun() throws HyracksDataException {
- getSorter().sort();
+ // assumption is that there will always be a sorter (i.e. sorter is not null)
+ void flushFramesToRun() throws HyracksDataException {
+ ISorter sorter = getSorter();
+ sorter.sort();
RunFileWriter runWriter = getRunFileWriter();
IFrameWriter flushWriter = getFlushableFrameWriter(runWriter);
flushWriter.open();
try {
- getSorter().flush(flushWriter);
+ sorter.flush(flushWriter);
} catch (Exception e) {
flushWriter.fail();
throw e;
@@ -70,7 +78,7 @@
flushWriter.close();
}
generatedRunFileReaders.add(runWriter.createDeleteOnCloseReader());
- getSorter().reset();
+ sorter.reset();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 406703e..6abc064 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -49,12 +49,9 @@
public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorDescriptor {
private static final Logger LOGGER = LogManager.getLogger();
-
private static final long serialVersionUID = 1L;
-
protected static final int SORT_ACTIVITY_ID = 0;
protected static final int MERGE_ACTIVITY_ID = 1;
-
protected final int[] sortFields;
protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
protected final IBinaryComparatorFactory[] comparatorFactories;
@@ -90,10 +87,10 @@
}
public static class SortTaskState extends AbstractStateObject {
- public List<GeneratedRunFileReader> generatedRunFileReaders;
- public ISorter sorter;
+ List<GeneratedRunFileReader> generatedRunFileReaders;
+ ISorter sorter;
- public SortTaskState(JobId jobId, TaskId taskId) {
+ SortTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
}
@@ -101,7 +98,7 @@
protected abstract class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public SortActivity(ActivityId id) {
+ protected SortActivity(ActivityId id) {
super(id);
}
@@ -111,7 +108,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private AbstractSortRunGenerator runGen;
@Override
@@ -143,26 +140,24 @@
runGen.fail();
}
};
- return op;
}
}
protected abstract class MergeActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public MergeActivity(ActivityId id) {
+ protected MergeActivity(ActivityId id) {
super(id);
}
protected abstract AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
- int necessaryFrames);
+ IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames);
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
@@ -176,13 +171,39 @@
}
INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null
: keyNormalizerFactories[0].createNormalizedKeyComputer();
- AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter,
- runs, comparators, nmkComputer, framesLimit);
- merger.process();
+ AbstractExternalSortRunMerger merger =
+ getSortRunMerger(ctx, recordDescProvider, runs, comparators, nmkComputer, framesLimit);
+ IFrameWriter wrappingWriter = null;
+ try {
+ if (runs.isEmpty()) {
+ wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer);
+ wrappingWriter.open();
+ if (sorter.hasRemaining()) {
+ sorter.flush(wrappingWriter);
+ }
+ } else {
+ // eagerly close the sorter here to release memory rather than in finally
+ sorter.close();
+ sorter = null;
+ wrappingWriter = merger.prepareFinalMergeResultWriter(writer);
+ wrappingWriter.open();
+ merger.process(wrappingWriter);
+ }
+ } catch (Throwable e) {
+ if (wrappingWriter != null) {
+ wrappingWriter.fail();
+ }
+ throw HyracksDataException.create(e);
+ } finally {
+ if (sorter != null) {
+ sorter.close();
+ }
+ if (wrappingWriter != null) {
+ wrappingWriter.close();
+ }
+ }
}
};
- return op;
}
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index b58d4c7..8b80a26 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -91,12 +90,11 @@
private static final long serialVersionUID = 1L;
@Override
- protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer,
- outRecDescs[0], necessaryFrames, outputLimit, writer);
+ protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0],
+ necessaryFrames, outputLimit);
}
};
}
@@ -113,7 +111,7 @@
RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
if (framesLimit <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+ throw new IllegalStateException();// minimum of 2 frames (1 in,1 out)
}
this.alg = alg;
this.policy = policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 2b985b9..fb32b0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -34,15 +34,15 @@
private final int[] sortFields;
- public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
- int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
- RecordDescriptor recordDesc, int framesLimit, int topK, IFrameWriter writer) {
- super(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, topK, writer);
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields,
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
+ int framesLimit, int topK) {
+ super(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, topK);
this.sortFields = sortFields;
}
@Override
- protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
return nextWriter;
}
@@ -59,7 +59,7 @@
}
@Override
- protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
return nextWriter;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index 260b665..92b7d7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -116,4 +116,12 @@
copy(tPointers, pos2, tPointersTemp, targetPos, rest);
}
}
+
+ private void copy(int src[], int srcPos, int dest[], int destPos) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
+ }
+
+ private void copy(int src[], int srcPos, int dest[], int destPos, int n) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index 486bc7c..ddef0d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -25,17 +25,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-public class FrameSorterQuickSort extends AbstractFrameSorter {
+class FrameSorterQuickSort extends AbstractFrameSorter {
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
- int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
- throws HyracksDataException {
- this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
- recordDescriptor, Integer.MAX_VALUE);
- }
-
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
throws HyracksDataException {
@@ -48,7 +40,7 @@
sort(0, tupleCount);
}
- void sort(int offset, int length) throws HyracksDataException {
+ private void sort(int offset, int length) throws HyracksDataException {
int m = offset + (length >> 1);
int a = offset;
@@ -102,4 +94,9 @@
}
}
+ private void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+ System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+ System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
+ System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index 1578975..d8431f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -45,7 +45,7 @@
protected final IBinaryComparatorFactory[] comparatorFactories;
protected final RecordDescriptor recordDescriptor;
protected ITupleSorter tupleSorter;
- protected IFrameTupleAccessor inAccessor;
+ protected final IFrameTupleAccessor inAccessor;
public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
@@ -71,7 +71,7 @@
}
@Override
- public ISorter getSorter() throws HyracksDataException {
+ public ISorter getSorter() {
return tupleSorter;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 180ecbc..3b07017 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -49,7 +49,7 @@
}
@Override
- public ISorter getSorter() throws HyracksDataException {
+ public ISorter getSorter() {
if (tupleSorter != null) {
return tupleSorter;
} else if (frameSorter != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index dea770a..b29057f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -21,7 +21,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -47,7 +46,7 @@
comparatorFactories, recordDescriptor);
}
- public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+ private TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
@@ -75,12 +74,11 @@
private static final long serialVersionUID = 1L;
@Override
- protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer,
- outRecDescs[0], necessaryFrames, topK, writer);
+ protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0],
+ necessaryFrames, topK);
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
index bcf661f..8fa570c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -57,10 +57,33 @@
}
INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null
: normalizedKeyComputerFactory.createNormalizedKeyComputer();
- AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
- inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators,
+ AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, runs, keyFields,
+ inRecordDesc, outputRec, outputRec, numFrames, keyFields, nmkComputer, comparators,
partialAggrInState, finalAggrInState, true);
- merger.process();
+ IFrameWriter wrappingWriter = null;
+ try {
+ if (runs.isEmpty()) {
+ wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer);
+ wrappingWriter.open();
+ if (sorter.hasRemaining()) {
+ sorter.flush(wrappingWriter);
+ }
+ } else {
+ wrappingWriter = merger.prepareFinalMergeResultWriter(writer);
+ wrappingWriter.open();
+ merger.process(wrappingWriter);
+ }
+ } catch (Throwable e) {
+ if (wrappingWriter != null) {
+ wrappingWriter.fail();
+ }
+ throw HyracksDataException.create(e);
+ } finally {
+ sorter.close();
+ if (wrappingWriter != null) {
+ wrappingWriter.close();
+ }
+ }
}
};
}