add normalized key computer support in Pregelix
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/NormalizedKeyComputer.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/NormalizedKeyComputer.java
new file mode 100644
index 0000000..6fd0fe5
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/NormalizedKeyComputer.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.graph;
+
+/**
+ * Users can extend this interface to speedup the performance, e.g., the Alpha-sort optimization for cache locality.
+ * The normalized key is an unsigned integer (represented by a signed integer, though) obtained from the binary represetnation
+ * of the corresponding vertex id.
+ * Usually the normalized key can be obtained from the prefix bytes of the vertex id bytes.
+ *
+ * @author yingyib
+ */
+public interface NormalizedKeyComputer {
+
+ /**
+ * Get the normalized key from the byte region of a vertex id.
+ * The following three parameters represent the byte region of a vertex id.
+ *
+ * @param data
+ * @param start
+ * @param len
+ * @return the normalized key.
+ */
+ public int getNormalizedKey(byte[] data, int start, int len);
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 8b6d1b6..dde1a5e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -22,6 +22,7 @@
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -56,6 +57,8 @@
public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
/** Final aggregate value class */
public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
+ /** The normalized key computer class */
+ public static final String NMK_COMPUTER_CLASS = "pregelix.nmkComputerClass";
/** num of vertices */
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
@@ -166,4 +169,13 @@
final public void setFrameSize(int frameSize) {
getConfiguration().setInt(FRAME_SIZE, frameSize);
}
+
+ /**
+ * Set the normalized key computer class
+ *
+ * @param nkcClass
+ */
+ final public void setNoramlizedKeyComputerClass(Class<?> nkcClass) {
+ getConfiguration().setClass(NMK_COMPUTER_CLASS, nkcClass, NormalizedKeyComputer.class);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index ff9724d..6dfb416 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -23,6 +23,7 @@
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -143,6 +144,18 @@
}
/**
+ * Create a user-defined normalized key computer class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user-defined normalized key computer
+ */
+ public static NormalizedKeyComputer createNormalizedKeyComputer(Configuration conf) {
+ Class<? extends NormalizedKeyComputer> nmkClass = getNormalizedKeyComputerClass(conf);
+ return ReflectionUtils.newInstance(nmkClass, conf);
+ }
+
+ /**
* Create a global aggregator class
*
* @param conf
@@ -320,7 +333,22 @@
}
/**
- * Get the user's subclassed global aggregator's global value class.
+ * Get the user's subclassed normalized key computer class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's normalized key computer class
+ */
+ @SuppressWarnings("unchecked")
+ public static Class<? extends NormalizedKeyComputer> getNormalizedKeyComputerClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<? extends NormalizedKeyComputer>) conf.getClass(PregelixJob.NMK_COMPUTER_CLASS,
+ NormalizedKeyComputer.class);
+ }
+
+ /**
+ * Get the user's subclassed normalized key computer class.
*
* @param conf
* Configuration to check
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
index b6c995a..4a4f5b3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
@@ -20,8 +20,5 @@
public interface INormalizedKeyComputerFactoryProvider {
@SuppressWarnings("rawtypes")
- INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass);
-
- @SuppressWarnings("rawtypes")
- INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass);
+ INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory();
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 8f0019e..c913eff 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -70,7 +70,6 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
@@ -212,8 +211,7 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
.getClass());
@@ -289,8 +287,7 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
.getClass());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index ff2f6a0..b9d1d74 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -175,8 +175,7 @@
* construct local sort operator
*/
int[] keyFields = new int[] { 0 };
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
@@ -385,8 +384,7 @@
/**
* construct local sort operator
*/
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index ee385f1..bc5ab14 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -141,8 +141,7 @@
* construct local sort operator
*/
int[] keyFields = new int[] { 0 };
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
@@ -345,8 +344,7 @@
/**
* construct local sort operator
*/
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 40b5f45..a47101d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -143,8 +143,7 @@
* construct global sort operator
*/
int[] keyFields = new int[] { 0 };
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
@@ -334,8 +333,7 @@
/**
* construct global sort operator
*/
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 3351a2c..96cfaba 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -140,8 +140,7 @@
* construct local sort operator
*/
int[] keyFields = new int[] { 0 };
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
@@ -348,8 +347,7 @@
/**
* construct local sort operator
*/
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil
- .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index 63b7c6d..319d7cc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -15,10 +15,14 @@
package edu.uci.ics.pregelix.core.jobgen;
+import org.apache.hadoop.conf.Configuration;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNormalizedKeyComputerFactory;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class JobGenUtil {
@@ -31,8 +35,12 @@
* @param keyClass
* @return
*/
- public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(int iteration, Class keyClass) {
- return NormalizedKeyComputerFactoryProvider.INSTANCE.getAscINormalizedKeyComputerFactory(keyClass);
+ public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(Configuration conf) {
+ Class<? extends NormalizedKeyComputer> clazz = BspUtils.getNormalizedKeyComputerClass(conf);
+ if (clazz.equals(NormalizedKeyComputer.class)) {
+ return null;
+ }
+ return new VertexIdNormalizedKeyComputerFactory(clazz);
}
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
deleted file mode 100644
index 0735593..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.jobgen.provider;
-
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.core.base.INormalizedKeyComputerFactoryProvider;
-import edu.uci.ics.pregelix.runtime.touchpoint.VLongAscNormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VLongDescNormalizedKeyComputerFactory;
-
-public class NormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
-
- public static INormalizedKeyComputerFactoryProvider INSTANCE = new NormalizedKeyComputerFactoryProvider();
-
- private NormalizedKeyComputerFactoryProvider() {
-
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass) {
- if (keyClass.getName().indexOf("VLongWritable") > 0)
- return new VLongAscNormalizedKeyComputerFactory();
- else
- return null;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass) {
- if (keyClass.getName().indexOf("VLongWritable") > 0)
- return new VLongDescNormalizedKeyComputerFactory();
- else
- return null;
- }
-}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 74ae455..6adbf83 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -34,6 +34,7 @@
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
@@ -137,6 +138,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index b8cd953..08de520 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -30,6 +30,7 @@
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
@@ -109,6 +110,7 @@
job.setVertexClass(GraphMutationVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimpleGraphMutationVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index b6d4da7..752b23a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -43,6 +43,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
@@ -216,6 +217,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 0895386..8102c14 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -40,6 +40,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
@@ -223,6 +224,7 @@
job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 199870e..396acae 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -30,6 +30,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
@@ -138,6 +139,7 @@
job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.getConfiguration().setLong(SOURCE_ID, 0);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
new file mode 100644
index 0000000..7d824ea
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.data;
+
+import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * @author yingyib
+ */
+public class VLongNormalizedKeyComputer implements NormalizedKeyComputer {
+
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+ @Override
+ public int getNormalizedKey(byte[] bytes, int start, int length) {
+ long value = SerDeUtils.readVLong(bytes, start, length);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ long unsignedFirstValue = (long) value;
+ int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ return nmk;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
index 85a139e..2ff800e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
@@ -39,6 +39,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.io.VLongWritable;
import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
@@ -290,6 +291,7 @@
job.setDynamicVertexValueSize(true);
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
System.out.println("maximal cliques: \n" + readMaximalCliqueResult(job.getConfiguration()));
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
index d3db095..89bfee8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
@@ -20,6 +20,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
@@ -134,6 +135,7 @@
job.setGlobalAggregatorClass(TriangleCountingAggregator.class);
job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
Client.run(args, job);
System.out.println("triangle count: " + readTriangleCountingResult(job.getConfiguration()));
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 0a5b214..8fafede 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -33,6 +33,7 @@
import edu.uci.ics.pregelix.example.ReachabilityVertex;
import edu.uci.ics.pregelix.example.ReachabilityVertex.SimpleReachibilityVertexOutputFormat;
import edu.uci.ics.pregelix.example.ShortestPathsVertex;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
@@ -65,6 +66,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -77,6 +79,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -89,6 +92,7 @@
job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -101,6 +105,7 @@
job.setVertexClass(PageRankVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -113,6 +118,7 @@
job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -125,6 +131,7 @@
job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -137,6 +144,7 @@
job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -152,6 +160,7 @@
job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -166,6 +175,7 @@
job.setVertexInputFormatClass(SimulatedPageRankVertexInputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -178,6 +188,7 @@
job.setVertexInputFormatClass(SimulatedPageRankVertexInputFormat.class);
job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -191,6 +202,7 @@
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setDynamicVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
@@ -204,6 +216,7 @@
job.setGlobalAggregatorClass(TriangleCountingAggregator.class);
job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -216,6 +229,7 @@
job.setDynamicVertexValueSize(true);
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -228,6 +242,7 @@
job.setDynamicVertexValueSize(true);
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -240,6 +255,7 @@
job.setDynamicVertexValueSize(true);
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -250,6 +266,7 @@
job.setVertexClass(GraphMutationVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimpleGraphMutationVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 22ae6cf..decbde8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 50662f9..c7fec9f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 9f51f6d..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Graph Mutation</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index 616c647..c0559d9 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Maximal Clique</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 5621259..541806d 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Maximal Clique 2</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index d4f81ba..7214b3f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Maximal Clique 3</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 744e5b0..65e0b30 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b51bd98..9e1e0b0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index a9e43bd..ceea85b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index c1a04ae..c05a4da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 410ea8b..ac0d508 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 0332ec5..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 4f280fc..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 9e791e2..9acd7bc 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>ShortestPaths</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 90caf6b..6c25575 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>ShortestPaths</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 0f44f4d..4a40a6a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -37,6 +37,7 @@
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
<property><name>mapred.job.name</name><value>Triangle Counting</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..04e16ac
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNormalizedKeyComputerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
+
+/**
+ * This class wraps the user-defined normalized key computer to calculate the normalized
+ * key of vertex ids.
+ *
+ * @author yingyib
+ */
+public class VertexIdNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private Class<? extends NormalizedKeyComputer> nmkComputerClass;
+
+ public VertexIdNormalizedKeyComputerFactory(Class<? extends NormalizedKeyComputer> nmkComputerClass) {
+ this.nmkComputerClass = nmkComputerClass;
+ }
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ try {
+ final NormalizedKeyComputer nmkComputer = nmkComputerClass.newInstance();
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ return nmkComputer.getNormalizedKey(bytes, start, length);
+ }
+
+ };
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}