use raw binary comparator internally for pregelix
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index c14f23a..3e4ecf8 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -264,7 +264,7 @@
throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
}
- this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
+ this.cmp = MultiComparator.create(cmpFactories);
leafFrame.setMultiComparator(cmp);
interiorFrame.setMultiComparator(cmp);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index d89d577..aa50645 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -65,6 +65,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -96,7 +97,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -197,17 +198,14 @@
return jobId;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
JobSpecification spec = new JobSpecification();
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
int[] keyFields = new int[1];
keyFields[0] = 0;
@@ -265,8 +263,7 @@
sortFields[0] = 0;
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
ClusterConfig.setLocationConstraint(spec, sorter);
@@ -297,7 +294,7 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
@@ -325,8 +322,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -366,7 +362,7 @@
}
public JobSpecification scanIndexWriteGraph() throws HyracksException {
- JobSpecification spec = scanIndexWriteToHDFS(conf);
+ JobSpecification spec = scanIndexWriteToHDFS(conf, false);
return spec;
}
@@ -382,7 +378,7 @@
FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
- JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+ JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration(), true);
dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
@@ -525,10 +521,9 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ INormalizedKeyComputerFactory nkmFactory = RawNormalizedKeyComputerFactory.INSTANCE;
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
ClusterConfig.setLocationConstraint(spec, sorter);
@@ -558,8 +553,9 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private JobSpecification scanIndexWriteToHDFS(Configuration conf) throws HyracksDataException, HyracksException {
+ @SuppressWarnings({ "rawtypes" })
+ private JobSpecification scanIndexWriteToHDFS(Configuration conf, boolean ckpointing) throws HyracksDataException,
+ HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
@@ -586,8 +582,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -599,6 +594,18 @@
null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
+ ExternalSortOperatorDescriptor sort = null;
+ if (!ckpointing) {
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getFinalNormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
+ recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+ }
+
/**
* construct write file operator
*/
@@ -612,7 +619,12 @@
* connect operator descriptors
*/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ if (!ckpointing) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, writer, 0);
+ } else {
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ }
spec.setFrameSize(frameSize);
return spec;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index bbd3a7d..32494dc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -52,6 +52,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
@@ -63,7 +64,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -98,7 +98,6 @@
super(job);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
@@ -130,8 +129,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
@@ -301,7 +299,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -317,8 +314,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
@@ -634,7 +630,7 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
@@ -669,8 +665,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 0bccde5..26142a3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -43,7 +44,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -75,7 +75,6 @@
super(job);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -102,8 +101,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -259,7 +257,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -275,8 +272,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ae2621e..ec0cff4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -42,7 +43,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -74,7 +74,6 @@
super(job);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -101,8 +100,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -248,7 +246,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -264,8 +261,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7645cd6..e918ede 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -42,7 +43,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -74,7 +74,6 @@
super(job);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -101,8 +100,7 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
ITypeTraits[] typeTraits = new ITypeTraits[2];
@@ -262,7 +260,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
@@ -278,8 +275,7 @@
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ comparatorFactories[0] = RawBinaryComparatorFactory.INSTANCE;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index c4f54fe..3478713 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -19,12 +19,14 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNormalizedKeyComputerFactory;
-@SuppressWarnings({ "rawtypes", "unchecked" })
+@SuppressWarnings({ "rawtypes" })
public class JobGenUtil {
/**
@@ -36,11 +38,7 @@
* @return
*/
public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(Configuration conf) {
- Class<? extends NormalizedKeyComputer> clazz = BspUtils.getNormalizedKeyComputerClass(conf);
- if (clazz.equals(NormalizedKeyComputer.class)) {
- return null;
- }
- return new VertexIdNormalizedKeyComputerFactory(clazz);
+ return RawNormalizedKeyComputerFactory.INSTANCE;
}
/**
@@ -52,6 +50,33 @@
* @return
*/
public static IBinaryComparatorFactory getIBinaryComparatorFactory(int iteration, Class keyClass) {
+ return RawBinaryComparatorFactory.INSTANCE;
+ }
+
+ /**
+ * get normalized key factory for the final output job
+ *
+ * @param iteration
+ * @param keyClass
+ * @return
+ */
+ public static INormalizedKeyComputerFactory getFinalNormalizedKeyComputerFactory(Configuration conf) {
+ Class<? extends NormalizedKeyComputer> clazz = BspUtils.getNormalizedKeyComputerClass(conf);
+ if (clazz.equals(NormalizedKeyComputer.class)) {
+ return null;
+ }
+ return new VertexIdNormalizedKeyComputerFactory(clazz);
+ }
+
+ /**
+ * get comparator for the final output job
+ *
+ * @param iteration
+ * @param keyClass
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class keyClass) {
return new WritableComparingBinaryComparatorFactory(keyClass);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..a58f8bc
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class RawNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+ public static final INormalizedKeyComputerFactory INSTANCE = new RawNormalizedKeyComputerFactory();
+
+ private RawNormalizedKeyComputerFactory() {
+
+ }
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ //int value = IntegerSerializerDeserializer.getInt(bytes, start);
+ return 0;
+ }
+
+ };
+ }
+
+}