[ASTERIXDB-2149] Enable multiple normalized keys in sort
- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of sort is changed.
Currently, during the (in-memory) sort, we use an int normalized keys to
speed up comparisions by avoiding random memory accesses. However, this
technique is inefficient if the first 4 bytes of the sorting keys are
not distinctive. From performance point of view, it's better to use
longer normalized keys when it's possible (2-3x improvements).
This is enabled by this patch by:
- Allowing multiple normalized keys during sort, and the length of each
normalized key can be longer (multiple integers).
- Enable memory budgeting of pointer directories as well during sort
(but for performance, we still use int[], instead of byte[] from frame).
The next patch will enable the AsterixDB layer to use this feature to
speed up sort performance.
Change-Id: I4354242ff731b4b006b8446b58f65873047dde78
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2127
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 925ff93..bb8223d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -39,18 +39,26 @@
private static final long serialVersionUID = 1L;
private final int[] sortFields;
- private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+ this(sortFields,
+ firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+ : null,
+ comparatorFactories, projectionList);
+ }
+
+ public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
super(projectionList);
// Obs: the projection list is currently ignored.
if (projectionList != null) {
throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
}
this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.keyNormalizerFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
}
@@ -67,8 +75,8 @@
IFrameBufferManager manager = new VariableFrameMemoryManager(
new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
- frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, outputRecordDesc);
+ frameSorter = new FrameSorterMergeSort(ctx, manager, VariableFramePool.UNLIMITED_MEMORY, sortFields,
+ keyNormalizerFactories, comparatorFactories, outputRecordDesc);
}
frameSorter.reset();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 34ed142..cc4c1b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -64,6 +64,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileReference;
@@ -722,7 +723,8 @@
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
// the algebricks op.
- InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, null,
+ InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 },
+ (INormalizedKeyComputerFactory) null,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
null);
RecordDescriptor sortDesc = scannerDesc;
@@ -836,7 +838,8 @@
// the sort (by nation id)
RecordDescriptor sortDesc = scannerDesc;
- InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, null,
+ InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 },
+ (INormalizedKeyComputerFactory) null,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, null);
// the group-by
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
index 2c79a4d..7bf8255 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -20,4 +20,9 @@
public interface INormalizedKeyComputer {
public int normalize(byte[] bytes, int start, int length);
+
+ default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ int key = normalize(bytes, start, length);
+ normalizedKeys[keyStart] = key;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
index 2b7198b..901702e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -22,4 +22,21 @@
public interface INormalizedKeyComputerFactory extends Serializable {
public INormalizedKeyComputer createNormalizedKeyComputer();
+
+ /**
+ *
+ * @return The length of the normalized key in terms of integers
+ */
+ default int getNormalizedKeyLength() {
+ return 1;
+ }
+
+ /**
+ *
+ * @return Whether we can solely rely on this normalized key to complete comparison,
+ * even when two normalized keys are equal
+ */
+ default boolean isDecisive() {
+ return false;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 5cfef28..41c0740 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
@@ -32,8 +31,13 @@
@Override
public int normalize(byte[] bytes, int start, int length) {
int value = IntegerPointable.getInteger(bytes, start);
- return value ^Integer.MIN_VALUE;
+ return value ^ Integer.MIN_VALUE;
}
};
}
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index 4ed11e6..5a59b5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -170,8 +170,8 @@
@Override
public ITuplePointerAccessor createTuplePointerAccessor() {
return new AbstractTuplePointerAccessor() {
- private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
- recordDescriptor);
+ private final IAppendDeletableFrameTupleAccessor bufferAccessor =
+ new DeletableFrameTupleAppender(recordDescriptor);
@Override
IFrameTupleAccessor getInnerAccessor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index 3f10f50..0b915dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -50,16 +50,25 @@
int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
- this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+ this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields,
+ firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+ : null,
+ comparatorFactories, aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT);
+ }
+
+ public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
+ int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
+ this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, keyNormalizerFactories, comparatorFactories,
aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT);
}
public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
- int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor outRecordDesc, Algorithm alg, EnumFreeSlotPolicy policy) throws HyracksDataException {
- super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, inputRecordDesc, alg, policy,
- framesLimit);
+ super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, inputRecordDesc, alg, policy, framesLimit);
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
@@ -70,20 +79,20 @@
@Override
protected RunFileWriter getRunFileWriter() throws HyracksDataException {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortGroupByRunGenerator.class.getSimpleName());
+ FileReference file = ctx.getJobletContext()
+ .createManagedWorkspaceFile(ExternalSortGroupByRunGenerator.class.getSimpleName());
return new RunFileWriter(file, ctx.getIoManager());
}
@Override
protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
//create group-by comparators
- IBinaryComparator[] comparators = new IBinaryComparator[Math
- .min(groupFields.length, comparatorFactories.length)];
+ IBinaryComparator[] comparators =
+ new IBinaryComparator[Math.min(groupFields.length, comparatorFactories.length)];
for (int i = 0; i < comparators.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
- this.inRecordDesc, this.outRecordDesc, writer, true);
+ return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, this.inRecordDesc,
+ this.outRecordDesc, writer, true);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index da5b4a8..23e47f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -55,39 +55,74 @@
private final RecordDescriptor partialAggRecordDesc;
private final RecordDescriptor outputRecordDesc;
private final boolean finalStage;
- private Algorithm alg = Algorithm.MERGE_SORT;
+ private static final Algorithm ALG = Algorithm.MERGE_SORT;
/**
* @param spec
- * , the Hyracks job specification
+ * the Hyracks job specification
* @param framesLimit
- * , the frame limit for this operator
+ * the frame limit for this operator
* @param sortFields
- * , the fields to sort
+ * the fields to sort
* @param groupFields
- * , the fields to group, which can be a prefix subset of sortFields
+ * the fields to group, which can be a prefix subset of sortFields
* @param firstKeyNormalizerFactory
- * , the normalized key computer factory of the first key
+ * the normalized key computer factory of the first key
* @param comparatorFactories
- * , the comparator factories of sort keys
+ * the comparator factories of sort keys
* @param partialAggregatorFactory
- * , for aggregating the input of this operator
+ * for aggregating the input of this operator
* @param mergeAggregatorFactory
- * , for aggregating the intermediate data of this operator
+ * for aggregating the intermediate data of this operator
* @param partialAggRecordDesc
- * , the record descriptor of intermediate data
+ * the record descriptor of intermediate data
* @param outRecordDesc
- * , the record descriptor of output data
+ * the record descriptor of output data
* @param finalStage
- * , whether the operator is used for final stage aggregation
+ * whether the operator is used for final stage aggregation
*/
public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
RecordDescriptor outRecordDesc, boolean finalStage) {
+ this(spec, framesLimit, sortFields, groupFields,
+ firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+ : null,
+ comparatorFactories, partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc,
+ outRecordDesc, finalStage);
+ }
- super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, outRecordDesc);
+ /**
+ * @param spec
+ * the Hyracks job specification
+ * @param framesLimit
+ * the frame limit for this operator
+ * @param sortFields
+ * the fields to sort
+ * @param groupFields
+ * the fields to group, which can be a prefix subset of sortFields
+ * @param keyNormalizerFactories
+ * the normalized key computer factories for the prefix the sortFields
+ * @param comparatorFactories
+ * the comparator factories of sort keys
+ * @param partialAggregatorFactory
+ * for aggregating the input of this operator
+ * @param mergeAggregatorFactory
+ * for aggregating the intermediate data of this operator
+ * @param partialAggRecordDesc
+ * the record descriptor of intermediate data
+ * @param outRecordDesc
+ * the record descriptor of output data
+ * @param finalStage
+ * whether the operator is used for final stage aggregation
+ */
+ public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+ IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+ RecordDescriptor outRecordDesc, boolean finalStage) {
+ super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc);
if (framesLimit <= 1) {
throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
}
@@ -110,8 +145,8 @@
IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
return new ExternalSortGroupByRunGenerator(ctx, sortFields,
recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
- groupFields, firstKeyNormalizerFactory, comparatorFactories, partialAggregatorFactory,
- partialAggRecordDesc, alg);
+ groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
+ partialAggRecordDesc, ALG);
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
index 7c7bfec..a8cc93b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
@@ -39,24 +39,24 @@
protected final int maxSortFrames;
public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
- this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
- EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+ this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+ framesLimit);
}
public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
- throws HyracksDataException {
- this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+ throws HyracksDataException {
+ this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
Integer.MAX_VALUE);
}
public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
- throws HyracksDataException {
+ throws HyracksDataException {
super();
this.ctx = ctx;
maxSortFrames = framesLimit - 1;
@@ -65,11 +65,11 @@
IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy);
if (alg == Algorithm.MERGE_SORT) {
- frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDesc, outputLimit);
+ frameSorter = new FrameSorterMergeSort(ctx, bufferManager, maxSortFrames, sortFields,
+ keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit);
} else {
- frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDesc, outputLimit);
+ frameSorter = new FrameSorterQuickSort(ctx, bufferManager, maxSortFrames, sortFields,
+ keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 77d5d49..6c061ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -39,46 +39,90 @@
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
import org.apache.hyracks.util.IntSerDeUtils;
public abstract class AbstractFrameSorter implements IFrameSorter {
protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName());
- static final int PTR_SIZE = 4;
- static final int ID_FRAMEID = 0;
- static final int ID_TUPLE_START = 1;
- static final int ID_TUPLE_END = 2;
- static final int ID_NORMAL_KEY = 3;
+ protected static final int ID_FRAME_ID = 0;
+ protected static final int ID_TUPLE_START = 1;
+ protected static final int ID_TUPLE_END = 2;
+ protected static final int ID_NORMALIZED_KEY = 3;
+
+ // the length of each normalized key (in terms of integers)
+ protected final int[] normalizedKeyLength;
+ // the total length of the normalized key (in term of integers)
+ protected final int normalizedKeyTotalLength;
+ // whether the normalized keys can be used to decide orders, even when normalized keys are the same
+ protected final boolean normalizedKeysDecisive;
+
+ protected final int ptrSize;
protected final int[] sortFields;
protected final IBinaryComparator[] comparators;
- protected final INormalizedKeyComputer nkc;
+ protected final INormalizedKeyComputer[] nkcs;
protected final IFrameBufferManager bufferManager;
protected final FrameTupleAccessor inputTupleAccessor;
protected final IFrameTupleAppender outputAppender;
protected final IFrame outputFrame;
protected final int outputLimit;
+ protected final long maxSortMemory;
+ protected long totalMemoryUsed;
protected int[] tPointers;
+ protected final int[] tmpPointer;
protected int tupleCount;
- private FrameTupleAccessor fta2;
- private BufferInfo info = new BufferInfo(null, -1, -1);
+ private final FrameTupleAccessor fta2;
+ private final BufferInfo info = new BufferInfo(null, -1, -1);
- public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) throws HyracksDataException {
- this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
- Integer.MAX_VALUE);
+ public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+ throws HyracksDataException {
+ this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+ recordDescriptor, Integer.MAX_VALUE);
}
- public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, int outputLimit)
+ public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
throws HyracksDataException {
this.bufferManager = bufferManager;
+ if (maxSortFrames == VariableFramePool.UNLIMITED_MEMORY) {
+ this.maxSortMemory = Long.MAX_VALUE;
+ } else {
+ this.maxSortMemory = (long) ctx.getInitialFrameSize() * maxSortFrames;
+ }
this.sortFields = sortFields;
- this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+ int runningNormalizedKeyTotalLength = 0;
+
+ if (normalizedKeyComputerFactories != null) {
+ int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories);
+
+ // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
+ // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
+ // computing unncessary normalized keys
+ int normalizedKeys = decisivePrefixLength < normalizedKeyComputerFactories.length ? decisivePrefixLength + 1
+ : decisivePrefixLength;
+ this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+ this.normalizedKeyLength = new int[normalizedKeys];
+
+ for (int i = 0; i < normalizedKeys; i++) {
+ this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer();
+ this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength();
+ runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+ }
+ this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length;
+ } else {
+ this.nkcs = null;
+ this.normalizedKeyLength = null;
+ this.normalizedKeysDecisive = false;
+ }
+ this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
+ this.ptrSize = ID_NORMALIZED_KEY + normalizedKeyTotalLength;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,17 +132,24 @@
this.outputFrame = new VSizeFrame(ctx);
this.outputLimit = outputLimit;
this.fta2 = new FrameTupleAccessor(recordDescriptor);
+ this.tmpPointer = new int[ptrSize];
}
@Override
public void reset() throws HyracksDataException {
this.tupleCount = 0;
+ this.totalMemoryUsed = 0;
this.bufferManager.reset();
}
@Override
public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
- if (bufferManager.insertFrame(inputBuffer) >= 0) {
+ inputTupleAccessor.reset(inputBuffer);
+ long requiredMemory = getRequiredMemory(inputTupleAccessor);
+ if (totalMemoryUsed + requiredMemory <= maxSortMemory && bufferManager.insertFrame(inputBuffer) >= 0) {
+ // we have enough memory
+ totalMemoryUsed += requiredMemory;
+ tupleCount += inputTupleAccessor.getTupleCount();
return true;
}
if (getFrameCount() == 0) {
@@ -108,36 +159,41 @@
return false;
}
+ protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+ return (long) frameAccessor.getBuffer().capacity() + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES;
+ }
+
@Override
public void sort() throws HyracksDataException {
- tupleCount = 0;
- for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
- bufferManager.getFrame(i, info);
- inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
- tupleCount += inputTupleAccessor.getTupleCount();
- }
- if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
- tPointers = new int[tupleCount * PTR_SIZE];
+ if (tPointers == null || tPointers.length < tupleCount * ptrSize) {
+ tPointers = new int[tupleCount * ptrSize];
}
int ptr = 0;
- int sfIdx = sortFields[0];
for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
bufferManager.getFrame(i, info);
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
int tCount = inputTupleAccessor.getTupleCount();
byte[] array = inputTupleAccessor.getBuffer().array();
- for (int j = 0; j < tCount; ++j) {
+ int fieldSlotsLength = inputTupleAccessor.getFieldSlotsLength();
+ for (int j = 0; j < tCount; ++j, ++ptr) {
int tStart = inputTupleAccessor.getTupleStartOffset(j);
int tEnd = inputTupleAccessor.getTupleEndOffset(j);
- tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
- tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
- tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
- int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx);
- int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
- int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength();
- tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
- nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
- ++ptr;
+ tPointers[ptr * ptrSize + ID_FRAME_ID] = i;
+ tPointers[ptr * ptrSize + ID_TUPLE_START] = tStart;
+ tPointers[ptr * ptrSize + ID_TUPLE_END] = tEnd;
+ if (nkcs == null) {
+ continue;
+ }
+ int keyPos = ptr * ptrSize + ID_NORMALIZED_KEY;
+ for (int k = 0; k < nkcs.length; k++) {
+ int sortField = sortFields[k];
+ int fieldStartOffsetRel = inputTupleAccessor.getFieldStartOffset(j, sortField);
+ int fieldEndOffsetRel = inputTupleAccessor.getFieldEndOffset(j, sortField);
+ int fieldStartOffset = fieldStartOffsetRel + tStart + fieldSlotsLength;
+ nkcs[k].normalize(array, fieldStartOffset, fieldEndOffsetRel - fieldStartOffsetRel, tPointers,
+ keyPos);
+ keyPos += normalizedKeyLength[k];
+ }
}
}
if (tupleCount > 0) {
@@ -164,9 +220,9 @@
int limit = Math.min(tupleCount, outputLimit);
int io = 0;
for (int ptr = 0; ptr < limit; ++ptr) {
- int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
- int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
- int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
+ int i = tPointers[ptr * ptrSize + ID_FRAME_ID];
+ int tStart = tPointers[ptr * ptrSize + ID_TUPLE_START];
+ int tEnd = tPointers[ptr * ptrSize + ID_TUPLE_END];
bufferManager.getFrame(i, info);
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd);
@@ -185,19 +241,23 @@
}
protected final int compare(int tp1, int tp2) throws HyracksDataException {
- int i1 = tPointers[tp1 * 4 + ID_FRAMEID];
- int j1 = tPointers[tp1 * 4 + ID_TUPLE_START];
- int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY];
+ return compare(tPointers, tp1, tPointers, tp2);
+ }
- int tp2i = tPointers[tp2 * 4 + ID_FRAMEID];
- int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START];
- int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY];
-
- if (v1 != tp2v) {
- return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
+ protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException {
+ if (nkcs != null) {
+ int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
+ tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
+ if (cmpNormalizedKey != 0 || normalizedKeysDecisive) {
+ return cmpNormalizedKey;
+ }
}
- int i2 = tp2i;
- int j2 = tp2j;
+
+ int i1 = tPointers1[tp1 * ptrSize + ID_FRAME_ID];
+ int j1 = tPointers1[tp1 * ptrSize + ID_TUPLE_START];
+ int i2 = tPointers2[tp2 * ptrSize + ID_FRAME_ID];
+ int j2 = tPointers2[tp2 * ptrSize + ID_TUPLE_START];
+
bufferManager.getFrame(i1, info);
byte[] b1 = info.getBuffer().array();
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
@@ -223,6 +283,43 @@
return 0;
}
+ public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
+ for (int i = 0; i < length; i++) {
+ int key1 = keys1[start1 + i];
+ int key2 = keys2[start2 + i];
+ if (key1 != key2) {
+ return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
+ }
+ }
+ return 0;
+ }
+
+ public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
+ if (keyNormalizerFactories == null) {
+ return 0;
+ }
+ for (int i = 0; i < keyNormalizerFactories.length; i++) {
+ if (!keyNormalizerFactories[i].isDecisive()) {
+ return i;
+ }
+ }
+ return keyNormalizerFactories.length;
+ }
+
+ protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+ System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+ System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
+ System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+ }
+
+ protected void copy(int src[], int srcPos, int dest[], int destPos) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
+ }
+
+ protected void copy(int src[], int srcPos, int dest[], int destPos, int n) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
+ }
+
@Override
public void close() {
tupleCount = 0;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 1cd5fc3..602157f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -56,17 +56,17 @@
protected static final int MERGE_ACTIVITY_ID = 1;
protected final int[] sortFields;
- protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
protected final IBinaryComparatorFactory[] comparatorFactories;
protected final int framesLimit;
public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.framesLimit = framesLimit;
this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.keyNormalizerFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
outRecDescs[0] = recordDescriptor;
}
@@ -174,8 +174,8 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null
+ : keyNormalizerFactories[0].createNormalizedKeyComputer();
AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter,
runs, comparators, nmkComputer, framesLimit);
merger.process();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 1b66ccf..b58d4c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,21 +43,31 @@
private final int outputLimit;
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor, Algorithm alg) {
- this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+ this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg,
EnumFreeSlotPolicy.LAST_FIT);
}
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor);
+ this(spec, framesLimit, sortFields, (INormalizedKeyComputerFactory[]) null, comparatorFactories,
+ recordDescriptor);
}
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ this(spec, framesLimit, sortFields,
+ firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+ : null,
+ comparatorFactories, recordDescriptor, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
+ }
+
+ public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor,
Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
}
@@ -69,7 +79,7 @@
@Override
protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
- return new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories,
outRecDescs[0], alg, policy, framesLimit, outputLimit);
}
};
@@ -92,16 +102,16 @@
}
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy) {
- this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
- policy, Integer.MAX_VALUE);
+ this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg, policy,
+ Integer.MAX_VALUE);
}
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
- super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
if (framesLimit <= 1) {
throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index b451b1c..785b94e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -31,32 +31,32 @@
public class ExternalSortRunGenerator extends AbstractExternalSortRunGenerator {
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
- this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
- EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+ this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+ framesLimit);
}
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
- throws HyracksDataException {
- this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+ throws HyracksDataException {
+ this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
Integer.MAX_VALUE);
}
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
- throws HyracksDataException {
- super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+ throws HyracksDataException {
+ super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
outputLimit);
}
@Override
protected RunFileWriter getRunFileWriter() throws HyracksDataException {
- FileReference file = ctx.getJobletContext()
- .createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
+ FileReference file =
+ ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
return new RunFileWriter(file, ctx.getIoManager());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index ed28560..260b665 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -23,24 +23,27 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
public class FrameSorterMergeSort extends AbstractFrameSorter {
private int[] tPointersTemp;
- public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) throws HyracksDataException {
- this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
- Integer.MAX_VALUE);
+ public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+ throws HyracksDataException {
+ this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+ recordDescriptor, Integer.MAX_VALUE);
}
- public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
- super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
- outputLimit);
+ public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
+ throws HyracksDataException {
+ super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+ recordDescriptor, outputLimit);
}
@Override
@@ -52,6 +55,11 @@
}
@Override
+ protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+ return super.getRequiredMemory(frameAccessor) + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES;
+ }
+
+ @Override
public void close() {
super.close();
tPointersTemp = null;
@@ -68,7 +76,7 @@
if (next < end) {
merge(i, next, step, Math.min(step, end - next));
} else {
- System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
+ copy(tPointers, i, tPointersTemp, i, end - i);
}
}
/** prepare next phase merge */
@@ -91,29 +99,21 @@
while (pos1 <= end1 && pos2 <= end2) {
int cmp = compare(pos1, pos2);
if (cmp <= 0) {
- copy(pos1, targetPos);
+ copy(tPointers, pos1, tPointersTemp, targetPos);
pos1++;
} else {
- copy(pos2, targetPos);
+ copy(tPointers, pos2, tPointersTemp, targetPos);
pos2++;
}
targetPos++;
}
if (pos1 <= end1) {
int rest = end1 - pos1 + 1;
- System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
+ copy(tPointers, pos1, tPointersTemp, targetPos, rest);
}
if (pos2 <= end2) {
int rest = end2 - pos2 + 1;
- System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
+ copy(tPointers, pos2, tPointersTemp, targetPos, rest);
}
}
-
- private void copy(int src, int dest) {
- tPointersTemp[dest * 4 + ID_FRAMEID] = tPointers[src * 4 + ID_FRAMEID];
- tPointersTemp[dest * 4 + ID_TUPLE_START] = tPointers[src * 4 + ID_TUPLE_START];
- tPointersTemp[dest * 4 + ID_TUPLE_END] = tPointers[src * 4 + ID_TUPLE_END];
- tPointersTemp[dest * 4 + ID_NORMAL_KEY] = tPointers[src * 4 + ID_NORMAL_KEY];
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index cf864f6..486bc7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -27,18 +27,20 @@
public class FrameSorterQuickSort extends AbstractFrameSorter {
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) throws HyracksDataException {
- this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
- Integer.MAX_VALUE);
+ public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+ throws HyracksDataException {
+ this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+ recordDescriptor, Integer.MAX_VALUE);
}
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
- super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
- outputLimit);
+ public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
+ throws HyracksDataException {
+ super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+ recordDescriptor, outputLimit);
}
@Override
@@ -60,7 +62,7 @@
break;
}
if (cmp == 0) {
- swap(tPointers, a++, b);
+ swap(tPointers, a++, tPointers, b);
}
++b;
}
@@ -70,13 +72,13 @@
break;
}
if (cmp == 0) {
- swap(tPointers, c, d--);
+ swap(tPointers, c, tPointers, d--);
}
--c;
}
if (b > c)
break;
- swap(tPointers, b++, c--);
+ swap(tPointers, b++, tPointers, c--);
}
int s;
@@ -94,17 +96,9 @@
}
}
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 4; ++i) {
- int t = x[a * 4 + i];
- x[a * 4 + i] = x[b * 4 + i];
- x[b * 4 + i] = t;
- }
- }
-
private void vecswap(int x[], int a, int b, int n) {
for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
+ swap(x, a, x, b);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index a058624..1578975 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -31,31 +31,31 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
public class HeapSortRunGenerator extends AbstractSortRunGenerator {
protected final IHyracksTaskContext ctx;
protected final int frameLimit;
protected final int topK;
protected final int[] sortFields;
- protected final INormalizedKeyComputerFactory nmkFactory;
+ protected final INormalizedKeyComputerFactory[] nmkFactories;
protected final IBinaryComparatorFactory[] comparatorFactories;
protected final RecordDescriptor recordDescriptor;
protected ITupleSorter tupleSorter;
protected IFrameTupleAccessor inAccessor;
public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
super();
this.ctx = ctx;
this.frameLimit = frameLimit;
this.topK = topK;
this.sortFields = sortFields;
- this.nmkFactory = firstKeyNormalizerFactory;
+ this.nmkFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
this.inAccessor = new FrameTupleAccessor(recordDescriptor);
this.recordDescriptor = recordDescriptor;
@@ -64,8 +64,9 @@
@Override
public void open() throws HyracksDataException {
IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize());
- IDeletableTupleBufferManager bufferManager = new VariableDeletableTupleMemoryManager(framePool, recordDescriptor);
- tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory, comparatorFactories);
+ IDeletableTupleBufferManager bufferManager =
+ new VariableDeletableTupleMemoryManager(framePool, recordDescriptor);
+ tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactories, comparatorFactories);
super.open();
}
@@ -76,8 +77,8 @@
@Override
protected RunFileWriter getRunFileWriter() throws HyracksDataException {
- FileReference file = ctx.getJobletContext()
- .createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
+ FileReference file =
+ ctx.getJobletContext().createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
return new RunFileWriter(file, ctx.getIoManager());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 4311128..80b36ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -43,9 +43,9 @@
private int tupleSorterFlushedTimes = 0;
public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
- super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ super(ctx, frameLimit, topK, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
}
@Override
@@ -60,8 +60,8 @@
@Override
protected RunFileWriter getRunFileWriter() throws HyracksDataException {
- FileReference file = ctx.getJobletContext()
- .createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
+ FileReference file =
+ ctx.getJobletContext().createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
return new RunFileWriter(file, ctx.getIoManager());
}
@@ -101,8 +101,8 @@
new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()),
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT,
frameLimit - 1));
- frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, nmkFactory, comparatorFactories,
- recordDescriptor, topK);
+ frameSorter = new FrameSorterMergeSort(ctx, bufferManager, frameLimit - 1, sortFields, nmkFactories,
+ comparatorFactories, recordDescriptor, topK);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("create frameSorter");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 996101b..adc0d5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -53,8 +53,8 @@
private static final int MERGE_ACTIVITY_ID = 1;
private final int[] sortFields;
- private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
@@ -62,11 +62,11 @@
}
public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.keyNormalizerFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
outRecDescs[0] = recordDescriptor;
}
@@ -123,8 +123,9 @@
new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
- state.frameSorter = new FrameSorterMergeSort(ctx, frameBufferManager, sortFields,
- firstKeyNormalizerFactory, comparatorFactories, outRecDescs[0]);
+ state.frameSorter =
+ new FrameSorterMergeSort(ctx, frameBufferManager, VariableFramePool.UNLIMITED_MEMORY,
+ sortFields, keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
state.frameSorter.reset();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 988eea3..a90d48f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -41,7 +41,16 @@
public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
- super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ this(spec, framesLimit, topK, sortFields,
+ firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+ : null,
+ comparatorFactories, recordDescriptor);
+ }
+
+ public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+ INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
this.topK = topK;
}
@@ -53,7 +62,7 @@
@Override
protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) {
- return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, firstKeyNormalizerFactory,
+ return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories,
comparatorFactories, outRecDescs[0]);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index c473819..980857a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -55,18 +55,19 @@
}
class HeapEntry implements IResetableComparable<HeapEntry> {
- int nmk;
+ int[] nmk;
TuplePointer tuplePointer;
public HeapEntry() {
tuplePointer = new TuplePointer();
- nmk = 0;
+ nmk = new int[normalizedKeyTotalLength];
}
@Override
public int compareTo(HeapEntry o) {
- if (nmk != o.nmk) {
- return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1;
+ int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
+ if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+ return cmpNormalizedKey;
}
bufferAccessor1.reset(tuplePointer);
bufferAccessor2.reset(o.tuplePointer);
@@ -93,13 +94,15 @@
return 0;
}
- public void reset(int nmkey) {
- nmk = nmkey;
+ public void reset(int[] nmkey) {
+ if (normalizedKeyTotalLength > 0) {
+ System.arraycopy(nmkey, 0, nmk, 0, normalizedKeyTotalLength);
+ }
}
@Override
public void reset(HeapEntry other) {
- nmk = other.nmk;
+ reset(other.nmk);
tuplePointer.reset(other.tuplePointer);
}
}
@@ -111,19 +114,23 @@
private final FrameTupleAppender outputAppender;
private final IFrame outputFrame;
private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
+ private final INormalizedKeyComputer[] nkcs;
+ private final boolean normalizedKeyDecisive;
+ private final int[] normalizedKeyLength;
+ private final int normalizedKeyTotalLength;
private final IBinaryComparator[] comparators;
- private HeapEntry maxEntry;
- private HeapEntry newEntry;
+ private final HeapEntry maxEntry;
+ private final HeapEntry newEntry;
private MaxHeap heap;
private boolean isSorted;
+ private final int[] nmk;
+
public TupleSorterHeapSort(IHyracksTaskContext ctx, IDeletableTupleBufferManager bufferManager, int topK,
- int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories)
- throws HyracksDataException {
+ int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+ IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException {
this.bufferManager = bufferManager;
this.bufferAccessor1 = bufferManager.createTuplePointerAccessor();
this.bufferAccessor2 = bufferManager.createTuplePointerAccessor();
@@ -131,7 +138,31 @@
this.outputFrame = new VSizeFrame(ctx);
this.outputAppender = new FrameTupleAppender();
this.sortFields = sortFields;
- this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+ int runningNormalizedKeyTotalLength = 0;
+ if (keyNormalizerFactories != null) {
+ int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories);
+
+ // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
+ // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
+ // computing unncessary normalized keys
+ int normalizedKeys = decisivePrefixLength < keyNormalizerFactories.length ? decisivePrefixLength + 1
+ : decisivePrefixLength;
+ this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+ this.normalizedKeyLength = new int[normalizedKeys];
+
+ for (int i = 0; i < normalizedKeys; i++) {
+ this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer();
+ this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength();
+ runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+ }
+ this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length;
+ } else {
+ this.nkcs = null;
+ this.normalizedKeyLength = null;
+ this.normalizedKeyDecisive = false;
+ }
+ this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -141,6 +172,7 @@
this.maxEntry = new HeapEntry();
this.newEntry = new HeapEntry();
this.isSorted = false;
+ this.nmk = new int[runningNormalizedKeyTotalLength];
}
@Override
@@ -154,7 +186,7 @@
throw new HyracksDataException(
"The Heap haven't be reset after sorting, the order of using this class is not correct.");
}
- int nmkey = getPNK(frameTupleAccessor, index);
+ int[] nmkey = getPNK(frameTupleAccessor, index);
if (heap.getNumEntries() >= topK) {
heap.peekMax(maxEntry);
if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) {
@@ -175,20 +207,29 @@
return true;
}
- private int getPNK(IFrameTupleAccessor fta, int tIx) {
- if (nkc == null) {
- return 0;
+ private int[] getPNK(IFrameTupleAccessor fta, int tIx) {
+ if (nkcs == null) {
+ return nmk;
}
- int sfIdx = sortFields[0];
- return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx),
- fta.getFieldLength(tIx, sfIdx));
+ int keyPos = 0;
+ byte[] buffer = fta.getBuffer().array();
+ for (int i = 0; i < nkcs.length; i++) {
+ int sfIdx = sortFields[i];
+ nkcs[i].normalize(buffer, fta.getAbsoluteFieldStartOffset(tIx, sfIdx), fta.getFieldLength(tIx, sfIdx), nmk,
+ keyPos);
+ keyPos += normalizedKeyLength[i];
+ }
+ return nmk;
}
- private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry)
+ private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry)
throws HyracksDataException {
- if (nmkey != maxEntry.nmk) {
- return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1;
+ int cmpNormalizedKey =
+ AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
+ if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+ return cmpNormalizedKey;
}
+
bufferAccessor2.reset(maxEntry.tuplePointer);
byte[] b1 = frameTupleAccessor.getBuffer().array();
byte[] b2 = bufferAccessor2.getBuffer().array();
@@ -254,9 +295,8 @@
for (int i = 0; i < numEntries; i++) {
HeapEntry minEntry = (HeapEntry) entries[i];
bufferAccessor1.reset(minEntry.tuplePointer);
- int flushed = FrameUtils
- .appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
- bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
+ int flushed = FrameUtils.appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
+ bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
if (flushed > 0) {
maxFrameSize = Math.max(maxFrameSize, flushed);
io++;
@@ -265,8 +305,7 @@
maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
outputAppender.write(writer, true);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(
- "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
+ LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
}
return maxFrameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 160336a..56bf853 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -58,31 +59,33 @@
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
- new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
- + "orders-part1.tbl"),
- new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
- + "orders-part2.tbl") };
+ new ManagedFileSplit(NC1_ID,
+ "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+ new ManagedFileSplit(NC2_ID,
+ "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
+ RecordDescriptor ordersDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+ ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
int outputLimit = 5; // larger than the total record numbers.
- TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
- outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+ TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+ (INormalizedKeyComputerFactory) null,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
@@ -90,23 +93,21 @@
spec.addResultSetId(rsId);
FileSplit fs = createFile(nc1);
- IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { fs });
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { fs });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
- spec.connect(
- new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
- 1, 0 }, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ new int[] { 1, 0 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
runTest(spec);
System.out.println("Result write into :" + fs.getPath() + " in node: " + fs.getNodeName());
@@ -122,31 +123,33 @@
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
- new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
- + "orders-part1.tbl"),
- new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
- + "orders-part2.tbl") };
+ new ManagedFileSplit(NC1_ID,
+ "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+ new ManagedFileSplit(NC2_ID,
+ "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
+ RecordDescriptor ordersDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+ ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
int outputLimit = 20;
- TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
- outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+ TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+ (INormalizedKeyComputerFactory) null,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index cfd4f30..d3b6b5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.tests.unit;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.DataInputStream;
import java.util.ArrayList;
@@ -57,11 +57,11 @@
static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
new UTF8StringSerializerDeserializer() };
static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
- static Random GRandom = new Random(System.currentTimeMillis());
+ static Random GRandom = new Random(0);
static int[] SortFields = new int[] { 0, 1 };
- static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ static IBinaryComparatorFactory[] ComparatorFactories =
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
for (int i = 0; i < maxSize.size(); i++) {
@@ -69,25 +69,30 @@
}
}
- abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
- throws HyracksDataException;
+ abstract AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit,
+ int numOfInputRecord) throws HyracksDataException;
- protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
- int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+ protected List<List<GeneratedRunFileReader>> testSortRecords(int pageSize, int frameLimit, int numRuns,
+ int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
IHyracksTaskContext ctx = testUtils.create(pageSize);
HashMap<Integer, String> keyValuePair = new HashMap<>();
List<IFrame> frameList = new ArrayList<>();
prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData,
keyValuePair);
- AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
- runGenerator.open();
- for (IFrame frame : frameList) {
- runGenerator.nextFrame(frame.getBuffer());
+
+ List<List<GeneratedRunFileReader>> results = new ArrayList<>();
+ AbstractSortRunGenerator[] runGenerators = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
+ for (AbstractSortRunGenerator runGenerator : runGenerators) {
+ runGenerator.open();
+ for (IFrame frame : frameList) {
+ runGenerator.nextFrame(frame.getBuffer());
+ }
+ runGenerator.close();
+ matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+ results.add(runGenerator.getRuns());
}
- runGenerator.close();
- matchResult(ctx, runGenerator.getRuns(), keyValuePair);
- return runGenerator.getRuns();
+ return results;
}
static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
@@ -114,7 +119,9 @@
bbis.setByteBuffer(fta.getBuffer(),
fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
String value = (String) RecordDesc.getFields()[1].deserialize(di);
-
+ if (!keyValuePair.containsKey(key)) {
+ assertTrue(false);
+ }
if (!keyValuePair.get(key).equals(value)) {
assertTrue(false);
}
@@ -146,7 +153,7 @@
static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
- throws HyracksDataException {
+ throws HyracksDataException {
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
FrameTupleAppender appender = new FrameTupleAppender();
@@ -158,8 +165,9 @@
tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
tb.addField(new UTF8StringSerializerDeserializer(), entry.getValue());
- VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(
- tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+ VSizeFrame frame =
+ new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length,
+ tb.getSize(), ctx.getInitialFrameSize()));
appender.reset(frame, true);
assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
frameList.add(frame);
@@ -226,9 +234,25 @@
int numRuns = 2;
int minRecordSize = pageSize / 8;
int maxRecordSize = pageSize / 8;
- List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, null);
- assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+ List<List<GeneratedRunFileReader>> maxSizes =
+ testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+ for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+ assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+ }
+ }
+
+ @Test
+ public void testAllManySmallRecords() throws HyracksDataException {
+ int pageSize = 10240;
+ int frameLimit = 4;
+ int numRuns = 2;
+ int minRecordSize = pageSize / 8;
+ int maxRecordSize = pageSize / 8;
+ List<List<GeneratedRunFileReader>> maxSizes =
+ testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+ for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+ assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+ }
}
@Test
@@ -238,9 +262,11 @@
int numRuns = 2;
int minRecordSize = pageSize;
int maxRecordSize = (int) (pageSize * 1.8);
- List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- null);
- assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
+ List<List<GeneratedRunFileReader>> maxSizes =
+ testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+ for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+ assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize * 2);
+ }
}
@Test
@@ -250,15 +276,16 @@
int numRuns = 4;
int minRecordSize = 20;
int maxRecordSize = pageSize / 2;
- HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
- List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- specialPair);
-
- int max = 0;
- for (GeneratedRunFileReader run : size) {
- max = Math.max(max, run.getMaxFrameSize());
+ HashMap<Integer, String> specialPair = generateBigObject(pageSize / 2, frameLimit - 1);
+ List<List<GeneratedRunFileReader>> sizes =
+ testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
+ for (List<GeneratedRunFileReader> size : sizes) {
+ int max = 0;
+ for (GeneratedRunFileReader run : size) {
+ max = Math.max(max, run.getMaxFrameSize());
+ }
+ assertTrue(max <= pageSize * (frameLimit - 1) && max >= pageSize * 2);
}
- assertTrue(max == pageSize * (frameLimit - 1));
}
@Test(expected = HyracksDataException.class)
@@ -269,8 +296,6 @@
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
int minRecordSize = 10;
int maxRecordSize = pageSize / 2;
- List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- specialPair);
-
+ testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
index fd57f1e..6765d1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
@@ -20,7 +20,11 @@
package org.apache.hyracks.tests.unit;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
@@ -28,9 +32,20 @@
public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest {
@Override
- AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
throws HyracksDataException {
- return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc,
- Algorithm.MERGE_SORT, frameLimit);
+ ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+ RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+ ExternalSortRunGenerator runGeneratorWithOneNormalizeKey = new ExternalSortRunGenerator(ctx, SortFields,
+ new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+ RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+ ExternalSortRunGenerator runGeneratorWithNormalizeKeys = new ExternalSortRunGenerator(ctx, SortFields,
+ new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory(),
+ new UTF8StringNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit,
+ Integer.MAX_VALUE);
+
+ return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizeKey,
+ runGeneratorWithNormalizeKeys };
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
index d219a56..5d9e771 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
@@ -19,23 +19,37 @@
package org.apache.hyracks.tests.unit;
-import org.junit.Test;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+import org.junit.Test;
public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest {
@Override
- AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
throws HyracksDataException {
- return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
- RecordDesc);
+ HeapSortRunGenerator runGenerator = new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+ null, ComparatorFactories, RecordDesc);
+ HeapSortRunGenerator runGeneratorWithOneNormalizedKey =
+ new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+ new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc);
+ HeapSortRunGenerator runGeneratorWithNormalizedKeys = new HeapSortRunGenerator(ctx, frameLimit,
+ numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+ new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc);
+
+ return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+ runGeneratorWithNormalizedKeys };
+
}
@Test
- public void testTopK(){
+ public void testTopK() {
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
deleted file mode 100644
index d1080f8..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.tests.unit;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
-
-public class HybridSortRunGenerator extends AbstractRunGeneratorTest {
- @Override
- AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
- throws HyracksDataException {
- return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
- RecordDesc);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
new file mode 100644
index 0000000..d91f1e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+
+public class HybridSortRunGeneratorTest extends AbstractRunGeneratorTest {
+ @Override
+ AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ throws HyracksDataException {
+ HybridTopKSortRunGenerator runGenerator = new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord,
+ SortFields, null, ComparatorFactories, RecordDesc);
+ HybridTopKSortRunGenerator runGeneratorWithOneNormalizedKey =
+ new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+ new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc);
+ HybridTopKSortRunGenerator runGeneratorWithNormalizedKeys = new HybridTopKSortRunGenerator(ctx, frameLimit,
+ numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+ new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc);
+
+ return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+ runGeneratorWithNormalizedKeys };
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index c68d59d..a219518 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -19,16 +19,8 @@
package org.apache.hyracks.tests.unit;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
import java.io.DataInputStream;
import java.util.ArrayList;
@@ -71,7 +63,7 @@
private final int numFrames;
private final int minRecordSize;
private final int maxRecordSize;
- private TreeMap<Integer, String> result = new TreeMap<>();
+ private final TreeMap<Integer, String> result = new TreeMap<>();
int maxFrameSize;
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
@@ -186,8 +178,8 @@
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMapList);
- RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
- null, RecordDesc);
+ RunMergingFrameReader reader =
+ new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMapList);
}
@@ -207,8 +199,8 @@
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMapList);
- RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
- null, RecordDesc);
+ RunMergingFrameReader reader =
+ new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMapList);
}
@@ -291,8 +283,8 @@
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMap);
- RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
- null, RecordDesc);
+ RunMergingFrameReader reader =
+ new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMap);
}
@@ -342,8 +334,8 @@
for (GeneratedRunFileReader run : runGenerator.getRuns()) {
runs.add(run);
}
- RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
- RecordDesc);
+ RunMergingFrameReader reader =
+ new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null, RecordDesc);
IFrame outFrame = new VSizeFrame(ctx);
reader.open();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
index f621bf9..b2a8323 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -19,13 +19,8 @@
package org.apache.hyracks.tests.unit;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -38,10 +33,13 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
@@ -126,12 +124,24 @@
}
@Test
- public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
- int topK = 1;
+ public void testHybridTopKWithOneNormalizedKey() throws HyracksDataException {
+ int topK = SORT_FRAME_LIMIT;
IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
- AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
- ComparatorFactories, RecordDesc);
+ AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields,
+ new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+ RecordDesc);
+ testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+ }
+ @Test
+ public void testHybridTopKWithTwoNormalizedKeys() throws HyracksDataException {
+ int topK = SORT_FRAME_LIMIT;
+ IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+ AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(
+ ctx, SORT_FRAME_LIMIT, topK, SortFields, new INormalizedKeyComputerFactory[] {
+ new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+ ComparatorFactories, RecordDesc);
+ testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
}
private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter)
@@ -148,7 +158,7 @@
List<IFrame> frameList = new ArrayList<>();
int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5;
- int minRecordSize = 16;
+ int minRecordSize = 64;
int maxRecordSize = 64;
AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null,
@@ -162,7 +172,6 @@
doSort(sorter, buffer);
- assertEquals(0, sorter.getRuns().size());
validateResult(sorter, topKAnswer);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 650c60d..23a6be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileSplit;
@@ -139,11 +140,11 @@
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
- RecordDescriptor wordDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ RecordDescriptor wordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
- FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
- new WordTupleParserFactory(), wordDesc);
+ FileScanOperatorDescriptor wordScanner =
+ new FileScanOperatorDescriptor(spec, splitsProvider, new WordTupleParserFactory(), wordDesc);
createPartitionConstraint(spec, wordScanner, inSplits);
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -170,13 +171,16 @@
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
} else {
- IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
- IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
- ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
- wordDesc)
- : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
- new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
+ IBinaryComparatorFactory[] cfs =
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ IOperatorDescriptor sorter =
+ "memsort".equalsIgnoreCase(algo)
+ ? new InMemorySortOperatorDescriptor(spec, keys,
+ new INormalizedKeyComputerFactory[] {
+ new UTF8StringNormalizedKeyComputerFactory() },
+ cfs, wordDesc)
+ : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
+ new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
createPartitionConstraint(spec, sorter, outSplits);
IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -195,9 +199,9 @@
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
- IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
- ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
- : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ IOperatorDescriptor writer =
+ "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+ : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 8ab0708..7e56004 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -19,10 +19,7 @@
package org.apache.hyracks.examples.tpch.client;
-import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
-import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
-import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc;
-import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+import static org.apache.hyracks.examples.tpch.client.Common.*;
import java.util.EnumSet;
@@ -31,6 +28,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
@@ -131,12 +129,12 @@
createPartitionConstraint(spec, ordScanner, ordersSplits);
AbstractSorterOperatorDescriptor sorter;
if (usingHeapSorter && limit < Integer.MAX_VALUE) {
- sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null,
- SortFieldsComparatorFactories, ordersDesc);
+ sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields,
+ (INormalizedKeyComputerFactory) null, SortFieldsComparatorFactories, ordersDesc);
} else {
if (memBufferAlg.equalsIgnoreCase("bestfit")) {
- sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields,
- null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
+ sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+ SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
EnumFreeSlotPolicy.SMALLEST_FIT, limit);
} else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,