support both quick sort and merge sort as in-memory sort algorithms, but merge sort is the default one
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index b299e78..131eea0 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -52,12 +52,12 @@
return new AbstractOneInputOneOutputPushRuntime() {
- FrameSorter frameSorter = null;
+ FrameSorterMergeSort frameSorter = null;
@Override
public void open() throws HyracksDataException {
if (frameSorter == null) {
- frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
outputRecordDesc);
}
frameSorter.reset();
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index b8e9ec0..31cd29b 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
@@ -94,7 +95,7 @@
public void initBlock(int blockId) throws HyracksDataException {
runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
- helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
this.blockId = blockId;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java
new file mode 100644
index 0000000..5c96bd0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+public enum Algorithm {
+ QUICK_SORT,
+ MERGE_SORT
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index f071b16..e1315e7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -52,6 +52,15 @@
private final IBinaryComparatorFactory[] comparatorFactories;
private final int framesLimit;
+ private Algorithm alg = Algorithm.MERGE_SORT;
+
+ public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, Algorithm alg) {
+ this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ this.alg = alg;
+ }
+
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor);
@@ -87,7 +96,7 @@
public static class SortTaskState extends AbstractStateObject {
private List<IFrameReader> runs;
- private FrameSorter frameSorter;
+ private IFrameSorter frameSorter;
public SortTaskState() {
}
@@ -123,7 +132,7 @@
@Override
public void open() throws HyracksDataException {
runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0], framesLimit);
+ comparatorFactories, recordDescriptors[0], alg, framesLimit);
runGen.open();
}
@@ -167,7 +176,7 @@
SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
SORT_ACTIVITY_ID), partition));
List<IFrameReader> runs = state.runs;
- FrameSorter frameSorter = state.frameSorter;
+ IFrameSorter frameSorter = state.frameSorter;
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index b149e30..3736fca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -30,15 +30,21 @@
public class ExternalSortRunGenerator implements IFrameWriter {
private final IHyracksTaskContext ctx;
- private final FrameSorter frameSorter;
+ private final IFrameSorter frameSorter;
private final List<IFrameReader> runs;
private final int maxSortFrames;
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, int framesLimit) throws HyracksDataException {
+ RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
this.ctx = ctx;
- frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
+ if (alg == Algorithm.MERGE_SORT) {
+ frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ recordDesc);
+ } else {
+ frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ recordDesc);
+ }
runs = new LinkedList<IFrameReader>();
maxSortFrames = framesLimit - 1;
}
@@ -87,7 +93,7 @@
public void fail() throws HyracksDataException {
}
- public FrameSorter getFrameSorter() {
+ public IFrameSorter getFrameSorter() {
return frameSorter;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 77efe89..eaf4162 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -68,8 +68,8 @@
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
- private FrameSorter frameSorter; // Used in External sort, no replacement
- // selection
+ private IFrameSorter frameSorter; // Used in External sort, no replacement
+ // selection
private FrameTupleAccessor outFrameAccessor; // Used in External sort, with
// replacement selection
private final int outputLimit; // Used in External sort, with replacement
@@ -78,7 +78,7 @@
// selection and limit on output size
// Constructor for external sort, no replacement selection
- public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
this.ctx = ctx;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
similarity index 97%
rename from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
rename to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index a6bb4e2..cc0f1ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -31,7 +31,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
-public class FrameSorter {
+public class FrameSorterMergeSort implements IFrameSorter {
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
@@ -50,7 +50,7 @@
private int[] tPointersTemp;
private int tupleCount;
- public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
+ public FrameSorterMergeSort(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
this.ctx = ctx;
@@ -69,15 +69,18 @@
dataFrameCount = 0;
}
+ @Override
public void reset() {
dataFrameCount = 0;
tupleCount = 0;
}
+ @Override
public int getFrameCount() {
return dataFrameCount;
}
+ @Override
public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer copyFrame;
if (dataFrameCount == buffers.size()) {
@@ -90,6 +93,7 @@
++dataFrameCount;
}
+ @Override
public void sortFrames() {
int nBuffers = dataFrameCount;
tupleCount = 0;
@@ -123,6 +127,7 @@
}
}
+ @Override
public void flushFrames(IFrameWriter writer) throws HyracksDataException {
appender.reset(outFrame, true);
for (int ptr = 0; ptr < tupleCount; ++ptr) {
@@ -241,6 +246,7 @@
return 0;
}
+ @Override
public void close() {
this.buffers.clear();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
similarity index 71%
copy from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
copy to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index a6bb4e2..083f4a7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -29,9 +29,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
-public class FrameSorter {
+public class FrameSorterQuickSort implements IFrameSorter {
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
@@ -47,10 +46,9 @@
private int dataFrameCount;
private int[] tPointers;
- private int[] tPointersTemp;
private int tupleCount;
- public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
+ public FrameSorterQuickSort(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
this.ctx = ctx;
@@ -69,15 +67,18 @@
dataFrameCount = 0;
}
+ @Override
public void reset() {
dataFrameCount = 0;
tupleCount = 0;
}
+ @Override
public int getFrameCount() {
return dataFrameCount;
}
+ @Override
public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer copyFrame;
if (dataFrameCount == buffers.size()) {
@@ -90,6 +91,7 @@
++dataFrameCount;
}
+ @Override
public void sortFrames() {
int nBuffers = dataFrameCount;
tupleCount = 0;
@@ -118,11 +120,11 @@
}
}
if (tupleCount > 0) {
- tPointersTemp = new int[tPointers.length];
- sort(0, tupleCount);
+ sort(tPointers, 0, tupleCount);
}
}
+ @Override
public void flushFrames(IFrameWriter writer) throws HyracksDataException {
appender.reset(outFrame, true);
for (int ptr = 0; ptr < tupleCount; ++ptr) {
@@ -145,73 +147,75 @@
}
}
- private void sort(int offset, int length) {
- int step = 1;
- int len = length;
- int end = offset + len;
- /** bottom-up merge */
- while (step < len) {
- /** merge */
- for (int i = offset; i < end; i += 2 * step) {
- int next = i + step;
- if (next < end) {
- merge(i, next, step, Math.min(step, end - next));
- } else {
- System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ int mi = tPointers[m * 4];
+ int mj = tPointers[m * 4 + 1];
+ int mv = tPointers[m * 4 + 3];
+
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int cmp = compare(tPointers, b, mi, mj, mv);
+ if (cmp > 0) {
+ break;
}
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
}
- /** prepare next phase merge */
- step *= 2;
- int[] tmp = tPointersTemp;
- tPointersTemp = tPointers;
- tPointers = tmp;
- }
- }
-
- /** Merge two subarrays into one */
- private void merge(int start1, int start2, int len1, int len2) {
- int targetPos = start1;
- int pos1 = start1;
- int pos2 = start2;
- int end1 = start1 + len1 - 1;
- int end2 = start2 + len2 - 1;
- while (pos1 <= end1 && pos2 <= end2) {
- int cmp = compare(pos1, pos2);
- if (cmp <= 0) {
- copy(pos1, targetPos);
- pos1++;
- } else {
- copy(pos2, targetPos);
- pos2++;
+ while (c >= b) {
+ int cmp = compare(tPointers, c, mi, mj, mv);
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
}
- targetPos++;
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
}
- if (pos1 <= end1) {
- int rest = end1 - pos1 + 1;
- System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
+
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
+
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
}
- if (pos2 <= end2) {
- int rest = end2 - pos2 + 1;
- System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
}
}
- private void copy(int src, int dest) {
- tPointersTemp[dest * 4] = tPointers[src * 4];
- tPointersTemp[dest * 4 + 1] = tPointers[src * 4 + 1];
- tPointersTemp[dest * 4 + 2] = tPointers[src * 4 + 2];
- tPointersTemp[dest * 4 + 3] = tPointers[src * 4 + 3];
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 4; ++i) {
+ int t = x[a * 4 + i];
+ x[a * 4 + i] = x[b * 4 + i];
+ x[b * 4 + i] = t;
+ }
}
- private int compare(int tp1, int tp2) {
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
+
+ private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
int i1 = tPointers[tp1 * 4];
int j1 = tPointers[tp1 * 4 + 1];
int v1 = tPointers[tp1 * 4 + 3];
-
- int tp2i = tPointers[tp2 * 4];
- int tp2j = tPointers[tp2 * 4 + 1];
- int tp2v = tPointers[tp2 * 4 + 3];
-
if (v1 != tp2v) {
return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
}
@@ -225,12 +229,12 @@
fta2.reset(buf2);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf1.array(), j1 + (fIdx - 1) * 4);
- int f1End = IntSerDeUtils.getInt(buf1.array(), j1 + fIdx * 4);
+ int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+ int f1End = buf1.getInt(j1 + fIdx * 4);
int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf2.array(), j2 + (fIdx - 1) * 4);
- int f2End = IntSerDeUtils.getInt(buf2.array(), j2 + fIdx * 4);
+ int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+ int f2End = buf2.getInt(j2 + fIdx * 4);
int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
@@ -241,7 +245,8 @@
return 0;
}
+ @Override
public void close() {
this.buffers.clear();
}
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
new file mode 100644
index 0000000..6778852
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameSorter {
+
+ public void reset();
+
+ public int getFrameCount();
+
+ public void insertFrame(ByteBuffer buffer) throws HyracksDataException;
+
+ public void sortFrames();
+
+ public void flushFrames(IFrameWriter writer) throws HyracksDataException;
+
+ public void close();
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 04c82af..6fa21b5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -77,7 +77,7 @@
}
public static class SortTaskState extends AbstractStateObject {
- private FrameSorter frameSorter;
+ private FrameSorterMergeSort frameSorter;
public SortTaskState() {
}
@@ -111,7 +111,7 @@
@Override
public void open() throws HyracksDataException {
state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
- state.frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+ state.frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
comparatorFactories, recordDescriptors[0]);
state.frameSorter.reset();
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index a188f64..d89d577 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -63,6 +63,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
@@ -716,7 +717,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
WritableComparator.get(vertexIdClass).getClass());
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, recordDescriptor);
+ nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, sort);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 195e595..bbd3a7d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -50,6 +50,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -195,7 +196,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
@@ -282,8 +283,8 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
@@ -401,7 +402,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
@@ -503,8 +504,8 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
@@ -601,7 +602,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
WritableComparator.get(vertexIdClass).getClass());
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, recordDescriptor);
+ nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, sort);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index cc90f2c..0bccde5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -138,7 +139,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
@@ -338,7 +339,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3b3c9e7..ae2621e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -140,7 +141,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, globalSort);
/**
@@ -327,7 +328,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, globalSort);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index e334095..7645cd6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -137,7 +138,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
@@ -153,7 +154,7 @@
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, globalSort);
/**
@@ -341,7 +342,7 @@
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
@@ -357,7 +358,7 @@
* construct global sort operator
*/
ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
- nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
ClusterConfig.setLocationConstraint(spec, globalSort);
/**