address Vinayak's code review comments
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java
new file mode 100644
index 0000000..7605295
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public abstract class AbstractHeartbeatWork extends SynchronizableWork {
+
+    private final ClusterControllerService ccs;
+    private final String nodeId;
+    private final HeartbeatData hbData;
+
+    public AbstractHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+        this.ccs = ccs;
+        this.nodeId = nodeId;
+        this.hbData = hbData;
+    }
+
+    @Override
+    public void doRun() {
+        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+        NodeControllerState state = nodeMap.get(nodeId);
+        if (state != null) {
+            state.notifyHeartbeat(hbData);
+        }
+        runWork();
+    }
+
+    public abstract void runWork();
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 58aaa57..bcb278b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -29,15 +29,15 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
-public abstract class AbstractTaskLifecycleWork extends AbstractWork {
+public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork {
     protected final ClusterControllerService ccs;
     protected final JobId jobId;
     protected final TaskAttemptId taId;
     protected final String nodeId;
 
     public AbstractTaskLifecycleWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
+        super(ccs, nodeId, null);
         this.ccs = ccs;
         this.jobId = jobId;
         this.taId = taId;
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public final void run() {
+    public final void runWork() {
         JobRun run = ccs.getActiveRunMap().get(jobId);
         if (run != null) {
             TaskId tid = taId.getTaskId();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index 5109078..c4d202f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,14 +21,12 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 /**
  * @author rico
  */
-public class ApplicationMessageWork extends AbstractWork {
+public class ApplicationMessageWork extends AbstractHeartbeatWork {
 
     private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
     private byte[] message;
@@ -37,6 +35,7 @@
     private ClusterControllerService ccs;
 
     public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
+        super(ccs, nodeId, null);
         this.ccs = ccs;
         this.deploymentId = deploymentId;
         this.nodeId = nodeId;
@@ -44,7 +43,7 @@
     }
 
     @Override
-    public void run() {
+    public void runWork() {
         final ICCApplicationContext ctx = ccs.getApplicationContext();
         try {
             final IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
@@ -58,7 +57,6 @@
             LOGGER.log(Level.WARNING, "Error in stats reporting", e);
             throw new RuntimeException(e);
         }
-        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 22c3348..46a7c16 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -49,7 +49,7 @@
 
     @Override
     public void run() {
-        LOGGER.warning("Cleanup for JobRun with id: " + jobId);
+        LOGGER.info("Cleanup for JobRun with id: " + jobId);
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         if (run == null) {
             LOGGER.warning("Unable to find JobRun with id: " + jobId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 66186ca..2d6bdea 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,10 +28,8 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
-public class JobletCleanupNotificationWork extends AbstractWork {
+public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
     private static final Logger LOGGER = Logger.getLogger(JobletCleanupNotificationWork.class.getName());
 
     private ClusterControllerService ccs;
@@ -39,14 +37,14 @@
     private String nodeId;
 
     public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
+        super(ccs, nodeId, null);
         this.ccs = ccs;
         this.jobId = jobId;
         this.nodeId = nodeId;
     }
 
     @Override
-    public void run() {
-        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
+    public void runWork() {
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
         if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index eef6b96..8ef8f66 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -17,24 +17,17 @@
 import java.util.logging.Level;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
-public class NodeHeartbeatWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final String nodeId;
-    private final HeartbeatData hbData;
+public class NodeHeartbeatWork extends AbstractHeartbeatWork {
 
     public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
-        this.ccs = ccs;
-        this.nodeId = nodeId;
-        this.hbData = hbData;
+        super(ccs, nodeId, hbData);
     }
 
     @Override
-    protected void doRun() throws Exception {
-        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
+    public void runWork() {
+
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index e400fa8..c35f385 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,17 +17,15 @@
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 /***
  * This is the work happens on the CC when CC gets a deployment or undeployment notification status message from one NC.
  * 
  * @author yingyib
  */
-public class NotifyDeployBinaryWork extends AbstractWork {
+public class NotifyDeployBinaryWork extends AbstractHeartbeatWork {
 
     private final ClusterControllerService ccs;
     private final String nodeId;
@@ -36,6 +34,7 @@
 
     public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId deploymentId, String nodeId,
             DeploymentStatus deploymentStatus) {
+        super(ccs, nodeId, null);
         this.ccs = ccs;
         this.nodeId = nodeId;
         this.deploymentId = deploymentId;
@@ -44,11 +43,10 @@
     }
 
     @Override
-    public void run() {
+    public void runWork() {
         /** triggered remotely by a NC to notify that the NC is deployed */
         DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
         dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
-        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 77cddf3..8e5050c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -51,7 +50,6 @@
                 jobletProfile.getTaskProfiles().put(taId, statistics);
             }
             run.getScheduler().notifyTaskComplete(ta);
-            HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 3578fd62..e00025d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
     private final List<Exception> exceptions;
@@ -39,7 +38,6 @@
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
-        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
deleted file mode 100644
index 51bbc16..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package edu.uci.ics.hyracks.control.cc.work.utils;
-
-import java.util.Map;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-
-public class HeartbeatUtils {
-
-    public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        NodeControllerState state = nodeMap.get(nodeId);
-        if (state != null) {
-            state.notifyHeartbeat(hbData);
-        }
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b7fcf7f..6049a3b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
 
         heartbeatTask = new HeartbeatTask(ccs);
 
-        // Schedule heartbeat generator. 
+        // Schedule heartbeat generator.
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
         if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c1efa91..5f78362 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -20,6 +20,7 @@
     private final int runid;
     private FrameTupleAccessor acccessor;
     private int tupleIndex;
+    private int tupleOffset;
 
     public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex) {
         super();
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index de5ef3b..d748d1f 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -12,8 +12,7 @@
  ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ! See the License for the specific language governing permissions and
  ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ !--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>pregelix</artifactId>
@@ -22,7 +21,7 @@
   <name>pregelix</name>
 
   <properties>
-    <jvm.extraargs />
+    <jvm.extraargs/>
   </properties>
 
   <profiles>
@@ -107,6 +106,7 @@
     <module>pregelix-runtime</module>
     <module>pregelix-core</module>
     <module>pregelix-example</module>
+    <module>pregelix-benchmark</module>
     <module>pregelix-dist</module>
   </modules>
 </project>
diff --git a/pregelix/pregelix-benchmark/pom.xml b/pregelix/pregelix-benchmark/pom.xml
new file mode 100644
index 0000000..4d7d456
--- /dev/null
+++ b/pregelix/pregelix-benchmark/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>pregelix</artifactId>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<version>0.2.10-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>pregelix-benchmark</artifactId>
+	<name>pregelix-benchmark</name>
+	<url>http://maven.apache.org</url>
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.giraph</groupId>
+			<artifactId>giraph-core</artifactId>
+			<version>1.0.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
new file mode 100644
index 0000000..04c29de
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
+    private final DoubleWritable vertexValue = new DoubleWritable();
+    private final DoubleWritable msg = new DoubleWritable();
+    private int maxIteration = -1;
+
+    @Override
+    public void compute(Iterable<DoubleWritable> msgIterator) {
+        if (maxIteration < 0) {
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+        }
+        if (getSuperstep() == 1) {
+            vertexValue.set(1.0 / getTotalNumVertices());
+        }
+        if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
+            double sum = 0;
+            for (DoubleWritable msg : msgIterator) {
+                sum += msg.get();
+            }
+            vertexValue.set((0.15 / getTotalNumVertices()) + 0.85 * sum);
+        }
+
+        if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
+            long edges = getNumEdges();
+            msg.set(vertexValue.get() / edges);
+            sendMessageToAllEdges(msg);
+        } else {
+            voteToHalt();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
new file mode 100644
index 0000000..3d85f66
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.benchmark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MapMutableEdge;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+
+    @Override
+    public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextVertexReaderFromEachLine() {
+            String[] items;
+
+            @Override
+            protected VLongWritable getId(Text line) throws IOException {
+                items = line.toString().split(" ");
+                return new VLongWritable(Long.parseLong(items[0]));
+            }
+
+            @Override
+            protected DoubleWritable getValue(Text line) throws IOException {
+                return null;
+            }
+
+            @Override
+            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
+                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
+                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+                for (int i = 1; i < items.length; i++) {
+                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                }
+                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                    edge.setEntry(entry);
+                    edge.setValue(null);
+                    edges.add(edge);
+                }
+                return edges;
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dist/pom.xml b/pregelix/pregelix-dist/pom.xml
index f0551a6..cec6efe 100644
--- a/pregelix/pregelix-dist/pom.xml
+++ b/pregelix/pregelix-dist/pom.xml
@@ -63,5 +63,10 @@
 			<artifactId>pregelix-example</artifactId>
 			<version>0.2.10-SNAPSHOT</version>
 		</dependency>
+		<dependency>
+                        <groupId>edu.uci.ics.hyracks</groupId>
+                        <artifactId>pregelix-benchmark</artifactId>
+                        <version>0.2.10-SNAPSHOT</version>
+                </dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 90065c2..44d23a4 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,13 +14,8 @@
  */
 package edu.uci.ics.pregelix.example.data;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.WritableUtils;
-
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
+import edu.uci.ics.pregelix.example.utils.SerDeUtils;
 
 /**
  * @author yingyib
@@ -31,14 +26,10 @@
     private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
     private static final int NEGATIVE_LONG_MASK = (0 << 30);
 
-    private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
-    private DataInput dis = new DataInputStream(bis);
-
     @Override
     public int getNormalizedKey(byte[] bytes, int start, int length) {
         try {
-            bis.setByteArray(bytes, start);
-            long value = WritableUtils.readVLong(dis);
+            long value = SerDeUtils.readVLong(bytes, start, length);
             int highValue = (int) (value >> 32);
             if (highValue > 0) {
                 /**