fix the cache miss problem in the sort merge reader
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a188f64 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -291,7 +291,7 @@
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), sorter, 0, writer, 0);
+ comparatorFactories, nkmFactory), sorter, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@@ -355,10 +355,11 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), scanner, 0, writer, 0);
+ comparatorFactories, nkmFactory), scanner, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@@ -739,8 +740,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- sort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9389f62..195e595 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -282,7 +282,7 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -503,7 +503,7 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 287b797..cc90f2c 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -241,8 +241,8 @@
* connect the group-by operator
*/
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
@@ -440,8 +440,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
- localGby, 0, globalGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+ nkmFactory), localGby, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4c7f91d..f599996 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,6 +91,7 @@
UTF8StringPointable.FACTORY);
private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
UTF8StringPointable.FACTORY);
+ private INormalizedKeyComputerFactory nmkFactory = new UTF8StringNormalizedKeyComputerFactory();
private void cleanupStores() throws IOException {
FileUtils.forceMkdir(new File("teststore"));
@@ -211,7 +214,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory);
spec.connect(joinWriterConn, sorter, 0, writer, 0);
spec.addRoot(writer);
@@ -284,8 +288,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- sorter, 0, writer, 0);
+ sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), sorter, 0, writer, 0);
spec.addRoot(writer);
runTest(spec);
@@ -368,11 +372,11 @@
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- sorter, 0, join, 0);
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), sorter, 0, join, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
- join, 0, writer, 0);
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory), join, 0, writer, 0);
spec.addRoot(writer);
runTest(spec);
@@ -477,7 +481,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 9 },
- new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+ nmkFactory);
spec.connect(joinWriterConn, sorter, 0, writer, 0);
spec.addRoot(writer);
@@ -573,7 +578,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
- UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+ UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories, nmkFactory), sorter, 0, join,
+ 0);
IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
@@ -581,7 +587,8 @@
int[] mergeFields = new int[] { 9, 0 };
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
- UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+ UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories, nmkFactory), join, 0,
+ writer, 0);
spec.addRoot(writer);
runTest(spec);