fix the cache miss problem in the sort merge reader
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 9955a63..3f1aae2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -44,9 +44,11 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -142,27 +144,32 @@
int n = orderColumns.size();
int[] sortFields = new int[n];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[n];
- {
- int j = 0;
- for (OrderColumn oc : orderColumns) {
- LogicalVariable var = oc.getColumn();
- sortFields[j] = opSchema.findVariable(var);
- Object type = env.getVarType(var);
- IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
- comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
- j++;
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ int j = 0;
+ for (OrderColumn oc : orderColumns) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[j] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ if (j == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, oc.getOrder() == OrderKind.ASC);
}
+ j++;
}
IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields,
- comparatorFactories);
+ comparatorFactories, nkcf);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
-
+
public List<LogicalVariable> getPartitionFields() {
return partitionFields;
}
-
+
public List<OrderColumn> getOrderColumns() {
return orderColumns;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index dc345ac..b03b99d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -39,9 +39,11 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -128,6 +130,10 @@
IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
IBinaryHashFunctionFactory[] hashFuns = new IBinaryHashFunctionFactory[n];
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
for (int i = 0; i < n; i++) {
sortFields[i] = opSchema.findVariable(sortColumns[i].getColumn());
Object type = env.getVarType(sortColumns[i].getColumn());
@@ -135,9 +141,12 @@
comps[i] = bcfp.getBinaryComparatorFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
IBinaryHashFunctionFactoryProvider bhffp = context.getBinaryHashFunctionFactoryProvider();
hashFuns[i] = bhffp.getBinaryHashFunctionFactory(type);
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+ }
}
ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(sortFields, hashFuns);
- IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps);
+ IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 09eeab6..1a1d41e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -320,8 +320,10 @@
collector.close();
}
} catch (HyracksException e) {
+ e.printStackTrace();
throw new HyracksDataException(e);
} catch (Exception e) {
+ e.printStackTrace();
throw new HyracksDataException(e);
}
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
index 9faef09..22615d2 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -18,6 +18,9 @@
public class IntSerDeUtils {
public static int getInt(byte[] bytes, int offset) {
+ if (offset + 3 > bytes.length) {
+ System.out.println("offset " + offset + " " + bytes.length);
+ }
return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ ((bytes[offset + 3] & 0xff) << 0);
}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 04b91b2..b8e9ec0 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -53,8 +53,8 @@
private final MarshalledWritable<Configuration> config;
private final IInputSplitProviderFactory factory;
- public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId, MarshalledWritable<Configuration> config,
- IInputSplitProviderFactory factory) throws HyracksDataException {
+ public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId,
+ MarshalledWritable<Configuration> config, IInputSplitProviderFactory factory) throws HyracksDataException {
super(spec, 0, 1);
this.jobId = jobId;
this.config = config;
@@ -114,7 +114,8 @@
runGen.nextFrame(frame);
fta.reset(frame, true);
if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + frame.capacity() + ")");
+ throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+ + frame.capacity() + ")");
}
}
}
@@ -224,7 +225,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
- runGen.getRuns(), new int[] { 0 }, comparators,
+ runGen.getRuns(), new int[] { 0 }, comparators, null,
helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
merger.process();
}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 30ba3ec..3bb78f9 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -108,7 +108,7 @@
runs.add(rfw.createReader());
}
RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
recordDescriptor, framesLimit, rfw);
merger.process();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
index 235b0d0..2dda9cc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
@@ -31,18 +32,21 @@
private final int nSenders;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
+ private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDescriptor;
private final IPartitionBatchManager pbm;
private RunMergingFrameReader merger;
public SortMergeFrameReader(IHyracksTaskContext ctx, int maxConcurrentMerges, int nSenders, int[] sortFields,
- IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, IPartitionBatchManager pbm) {
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDescriptor,
+ IPartitionBatchManager pbm) {
this.ctx = ctx;
this.maxConcurrentMerges = maxConcurrentMerges;
this.nSenders = nSenders;
this.sortFields = sortFields;
this.comparators = comparators;
+ this.nmkComputer = nmkComputer;
this.recordDescriptor = recordDescriptor;
this.pbm = pbm;
}
@@ -57,7 +61,7 @@
List<IFrameReader> batch = new ArrayList<IFrameReader>();
pbm.getNextBatch(batch, nSenders);
merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
- comparators, recordDescriptor);
+ comparators, nmkComputer, recordDescriptor);
} else {
// multi level merge.
throw new HyracksDataException("Not yet supported");
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 32269bd..1cf402b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -39,19 +41,23 @@
private final ITuplePartitionComputerFactory tpcf;
private final int[] sortFields;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory nkcFactory;
private final boolean stable;
- public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
- this(spec, tpcf, sortFields, comparatorFactories, false);
+ public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory nkcFactory) {
+ this(spec, tpcf, sortFields, comparatorFactories, nkcFactory, false);
}
- public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, boolean stable) {
+ public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+ INormalizedKeyComputerFactory nkcFactory, boolean stable) {
super(spec);
this.tpcf = tpcf;
this.sortFields = sortFields;
this.comparatorFactories = comparatorFactories;
+ this.nkcFactory = nkcFactory;
this.stable = stable;
}
@@ -71,9 +77,10 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ INormalizedKeyComputer nmkComputer = nkcFactory == null ? null : nkcFactory.createNormalizedKeyComputer();
IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
- sortFields, comparators, recordDesc, pbm);
+ sortFields, comparators, nmkComputer, recordDesc, pbm);
BitSet expectedPartitions = new BitSet();
expectedPartitions.set(0, nProducerPartitions);
return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..bb3d0f3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -25,6 +25,8 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,6 +49,7 @@
private final Object stateId;
private final int[] keyFields;
private final IBinaryComparator[] comparators;
+ private final INormalizedKeyComputer nmkComputer;
private final AggregateState aggregateState;
private final ArrayTupleBuilder tupleBuilder;
private final int[] storedKeys;
@@ -76,7 +79,7 @@
private final FrameTupleAccessor outFrameAccessor;
ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
- IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
this.stateId = stateId;
@@ -85,6 +88,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
int[] keyFieldsInPartialResults = new int[keyFields.length];
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
keyFieldsInPartialResults[i] = i;
@@ -181,7 +185,7 @@
FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
- runNumber, comparator);
+ runNumber, comparator, keyFields, nmkComputer);
/**
* current tuple index in each run
*/
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index b914702..9296ffa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -130,8 +130,8 @@
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
- AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, keyFields, mergerFactory, isOutputSorted,
- framesLimit, recordDescriptors[0]);
+ AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, firstNormalizerFactory, keyFields,
+ mergerFactory, isOutputSorted, framesLimit, recordDescriptors[0]);
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index f80a82c..f071b16 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -171,9 +172,11 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
- comparators, recordDescriptors[0], necessaryFrames, writer);
+ comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
merger.process();
}
};
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 510dfd6..77efe89 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -59,6 +60,7 @@
private final List<IFrameReader> runs;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
+ private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
private final int framesLimit;
private final IFrameWriter writer;
@@ -77,13 +79,14 @@
// Constructor for external sort, no replacement selection
public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
- int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
- IFrameWriter writer) {
+ int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+ RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
this.ctx = ctx;
this.frameSorter = frameSorter;
this.runs = new LinkedList<IFrameReader>(runs);
this.sortFields = sortFields;
this.comparators = comparators;
+ this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
@@ -92,11 +95,13 @@
// Constructor for external sort with replacement selection
public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
- IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
+ IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
+ int framesLimit, IFrameWriter writer) {
this.ctx = ctx;
this.runs = new LinkedList<IFrameReader>(runs);
this.sortFields = sortFields;
this.comparators = comparators;
+ this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
@@ -162,7 +167,7 @@
private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
- recordDesc);
+ nmkComputer, recordDesc);
merger.open();
try {
while (merger.nextFrame(outFrame)) {
@@ -263,7 +268,7 @@
runCursors[i] = runs.get(i);
}
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
- comparators, recordDesc);
+ comparators, nmkComputer, recordDesc);
merger.open();
try {
while (merger.nextFrame(outFrame)) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index bff2e21..ef1ae88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -201,9 +202,11 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
int necessaryFrames = Math.min(runs.size() + 2, memSize);
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
- comparators, recordDescriptors[0], necessaryFrames, writer);
+ comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
merger.processWithReplacementSelection();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index 16b3c12..00fbe9b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -35,6 +36,7 @@
private final List<ByteBuffer> inFrames;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
+ private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
private final FrameTupleAppender outFrameAppender;
private ReferencedPriorityQueue topTuples;
@@ -42,12 +44,14 @@
private FrameTupleAccessor[] tupleAccessors;
public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
- int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc) {
+ int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+ RecordDescriptor recordDesc) {
this.ctx = ctx;
this.runCursors = runCursors;
this.inFrames = inFrames;
this.sortFields = sortFields;
this.comparators = comparators;
+ this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
}
@@ -56,7 +60,8 @@
public void open() throws HyracksDataException {
tupleAccessors = new FrameTupleAccessor[runCursors.length];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+ topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
+ sortFields, nmkComputer);
tupleIndexes = new int[runCursors.length];
for (int i = 0; i < runCursors.length; i++) {
tupleIndexes[i] = 0;
@@ -143,21 +148,25 @@
private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+ int nmk1 = tp1.getNormalizedKey();
+ int nmk2 = tp1.getNormalizedKey();
+ if (nmk1 > nmk2) {
+ return 1;
+ }
+ if (nmk1 < nmk2) {
+ return -1;
+ }
+
FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = tp1.getTupleIndex();
- int j2 = tp2.getTupleIndex();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
+ int[] tPointers1 = tp1.getTPointers();
+ int[] tPointers2 = tp2.getTPointers();
+
for (int f = 0; f < sortFields.length; ++f) {
- int fIdx = sortFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ int c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2,
+ tPointers2[2 * f + 1], tPointers2[2 * f + 2]);
if (c != 0) {
return c;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c1efa91..c049c8b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -14,18 +14,24 @@
*/
package edu.uci.ics.hyracks.dataflow.std.util;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class ReferenceEntry {
private final int runid;
private FrameTupleAccessor acccessor;
private int tupleIndex;
+ private int[] tPointers;
- public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex) {
+ public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+ INormalizedKeyComputer nmkComputer) {
super();
this.runid = runid;
this.acccessor = fta;
- this.tupleIndex = tupleIndex;
+ this.tPointers = new int[1 + 2 * keyFields.length];
+ if (fta != null) {
+ initTPointer(fta, tupleIndex, keyFields, nmkComputer);
+ }
}
public int getRunid() {
@@ -40,11 +46,38 @@
this.acccessor = fta;
}
+ public int[] getTPointers() {
+ return tPointers;
+ }
+
public int getTupleIndex() {
return tupleIndex;
}
- public void setTupleIndex(int tupleIndex) {
+ public int getNormalizedKey() {
+ return tPointers[0];
+ }
+
+ public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) {
this.tupleIndex = tupleIndex;
+ initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
+ }
+
+ private void initTPointer(FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+ INormalizedKeyComputer nmkComputer) {
+ byte[] b1 = fta.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = keyFields[f];
+ tPointers[2 * f + 1] = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength()
+ + fta.getFieldStartOffset(tupleIndex, fIdx);
+ tPointers[2 * f + 2] = fta.getFieldEndOffset(tupleIndex, fIdx) - fta.getFieldStartOffset(tupleIndex, fIdx);
+ if (f == 0) {
+ if (nmkComputer != null) {
+ tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);
+ } else {
+ tPointers[0] = 0;
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
index 7767ace..225f583 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
@@ -18,6 +18,7 @@
import java.util.BitSet;
import java.util.Comparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -30,21 +31,25 @@
private int nItems;
private final Comparator<ReferenceEntry> comparator;
+ private final INormalizedKeyComputer nmkComputer;
+ private final int[] keyFields;
public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
- Comparator<ReferenceEntry> comparator) {
+ Comparator<ReferenceEntry> comparator, int[] keyFields, INormalizedKeyComputer nmkComputer) {
this.frameSize = frameSize;
this.recordDescriptor = recordDescriptor;
if (initSize < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
+ this.nmkComputer = nmkComputer;
+ this.keyFields = keyFields;
nItems = initSize;
size = (initSize + 1) & 0xfffffffe;
entries = new ReferenceEntry[size];
runAvail = new BitSet(size);
runAvail.set(0, initSize, true);
for (int i = 0; i < size; i++) {
- entries[i] = new ReferenceEntry(i, null, -1);
+ entries[i] = new ReferenceEntry(i, null, -1, keyFields, nmkComputer);
}
}
@@ -71,7 +76,7 @@
entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
}
entry.getAccessor().reset(fta.getBuffer());
- entry.setTupleIndex(tIndex);
+ entry.setTupleIndex(tIndex, keyFields, nmkComputer);
add(entry);
return entry.getRunid();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 380ba29..a2ef99a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -93,8 +94,8 @@
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
- printer, 0);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
runTest(spec);
}
@@ -148,8 +149,8 @@
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0, filter,
- 0);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 48db176..faa55e8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,7 +90,8 @@
new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }), new int[] { 1 },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
+ .of(UTF8StringPointable.FACTORY) }, new UTF8StringNormalizedKeyComputerFactory()),
+ sorter, 0, printer, 0);
runTest(spec);
}
@@ -138,8 +140,8 @@
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
- printer, 0);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
runTest(spec);
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
index c61c15b..61c5a1c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
@@ -156,8 +156,8 @@
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
- 0, writeOperator, 0);
+ new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+ sortOperator, 0, writeOperator, 0);
jobSpec.addRoot(writeOperator);
IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 3e43fb1..baa4dc7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -163,8 +163,8 @@
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
- 0, writeOperator, 0);
+ new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+ sortOperator, 0, writeOperator, 0);
jobSpec.addRoot(writeOperator);
IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a188f64 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -291,7 +291,7 @@
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), sorter, 0, writer, 0);
+ comparatorFactories, nkmFactory), sorter, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@@ -355,10 +355,11 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), scanner, 0, writer, 0);
+ comparatorFactories, nkmFactory), scanner, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@@ -739,8 +740,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- sort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9389f62..195e595 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -282,7 +282,7 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -503,7 +503,7 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 287b797..cc90f2c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -241,8 +241,8 @@
* connect the group-by operator
*/
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
@@ -440,8 +440,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4c7f91d..f599996 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,6 +91,7 @@
UTF8StringPointable.FACTORY);
private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
UTF8StringPointable.FACTORY);
+ private INormalizedKeyComputerFactory nmkFactory = new UTF8StringNormalizedKeyComputerFactory();
private void cleanupStores() throws IOException {
FileUtils.forceMkdir(new File("teststore"));
@@ -211,7 +214,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory);
spec.connect(joinWriterConn, sorter, 0, writer, 0);
spec.addRoot(writer);
@@ -284,8 +288,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- sorter, 0, writer, 0);
+ sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), sorter, 0, writer, 0);
spec.addRoot(writer);
runTest(spec);
@@ -368,11 +372,11 @@
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- sorter, 0, join, 0);
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), sorter, 0, join, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- join, 0, writer, 0);
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), join, 0, writer, 0);
spec.addRoot(writer);
runTest(spec);
@@ -477,7 +481,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 9 },
- new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory);
spec.connect(joinWriterConn, sorter, 0, writer, 0);
spec.addRoot(writer);
@@ -573,7 +578,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
- UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+ UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories, nmkFactory), sorter, 0, join,
+ 0);
IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
@@ -581,7 +587,8 @@
int[] mergeFields = new int[] { 9, 0 };
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
- UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+ UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories, nmkFactory), join, 0,
+ writer, 0);
spec.addRoot(writer);
runTest(spec);