address Vinayak's code review comments
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java
new file mode 100644
index 0000000..7605295
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public abstract class AbstractHeartbeatWork extends SynchronizableWork {
+
+ private final ClusterControllerService ccs;
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public AbstractHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+ this.ccs = ccs;
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public void doRun() {
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ NodeControllerState state = nodeMap.get(nodeId);
+ if (state != null) {
+ state.notifyHeartbeat(hbData);
+ }
+ runWork();
+ }
+
+ public abstract void runWork();
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 58aaa57..bcb278b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -29,15 +29,15 @@
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-public abstract class AbstractTaskLifecycleWork extends AbstractWork {
+public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork {
protected final ClusterControllerService ccs;
protected final JobId jobId;
protected final TaskAttemptId taId;
protected final String nodeId;
public AbstractTaskLifecycleWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
+ super(ccs, nodeId, null);
this.ccs = ccs;
this.jobId = jobId;
this.taId = taId;
@@ -45,7 +45,7 @@
}
@Override
- public final void run() {
+ public final void runWork() {
JobRun run = ccs.getActiveRunMap().get(jobId);
if (run != null) {
TaskId tid = taId.getTaskId();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index 5109078..c4d202f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,14 +21,12 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
/**
* @author rico
*/
-public class ApplicationMessageWork extends AbstractWork {
+public class ApplicationMessageWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
private byte[] message;
@@ -37,6 +35,7 @@
private ClusterControllerService ccs;
public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
+ super(ccs, nodeId, null);
this.ccs = ccs;
this.deploymentId = deploymentId;
this.nodeId = nodeId;
@@ -44,7 +43,7 @@
}
@Override
- public void run() {
+ public void runWork() {
final ICCApplicationContext ctx = ccs.getApplicationContext();
try {
final IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
@@ -58,7 +57,6 @@
LOGGER.log(Level.WARNING, "Error in stats reporting", e);
throw new RuntimeException(e);
}
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 22c3348..46a7c16 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -49,7 +49,7 @@
@Override
public void run() {
- LOGGER.warning("Cleanup for JobRun with id: " + jobId);
+ LOGGER.info("Cleanup for JobRun with id: " + jobId);
final JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
LOGGER.warning("Unable to find JobRun with id: " + jobId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 66186ca..2d6bdea 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,10 +28,8 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-public class JobletCleanupNotificationWork extends AbstractWork {
+public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = Logger.getLogger(JobletCleanupNotificationWork.class.getName());
private ClusterControllerService ccs;
@@ -39,14 +37,14 @@
private String nodeId;
public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
+ super(ccs, nodeId, null);
this.ccs = ccs;
this.jobId = jobId;
this.nodeId = nodeId;
}
@Override
- public void run() {
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
+ public void runWork() {
final JobRun run = ccs.getActiveRunMap().get(jobId);
Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index eef6b96..8ef8f66 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -17,24 +17,17 @@
import java.util.logging.Level;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-public class NodeHeartbeatWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final String nodeId;
- private final HeartbeatData hbData;
+public class NodeHeartbeatWork extends AbstractHeartbeatWork {
public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
- this.ccs = ccs;
- this.nodeId = nodeId;
- this.hbData = hbData;
+ super(ccs, nodeId, hbData);
}
@Override
- protected void doRun() throws Exception {
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
+ public void runWork() {
+
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index e400fa8..c35f385 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,17 +17,15 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
/***
* This is the work happens on the CC when CC gets a deployment or undeployment notification status message from one NC.
*
* @author yingyib
*/
-public class NotifyDeployBinaryWork extends AbstractWork {
+public class NotifyDeployBinaryWork extends AbstractHeartbeatWork {
private final ClusterControllerService ccs;
private final String nodeId;
@@ -36,6 +34,7 @@
public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId deploymentId, String nodeId,
DeploymentStatus deploymentStatus) {
+ super(ccs, nodeId, null);
this.ccs = ccs;
this.nodeId = nodeId;
this.deploymentId = deploymentId;
@@ -44,11 +43,10 @@
}
@Override
- public void run() {
+ public void runWork() {
/** triggered remotely by a NC to notify that the NC is deployed */
DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 77cddf3..8e5050c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -51,7 +50,6 @@
jobletProfile.getTaskProfiles().put(taId, statistics);
}
run.getScheduler().notifyTaskComplete(ta);
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
} catch (HyracksException e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 3578fd62..e00025d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
private final List<Exception> exceptions;
@@ -39,7 +38,6 @@
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
- HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
deleted file mode 100644
index 51bbc16..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package edu.uci.ics.hyracks.control.cc.work.utils;
-
-import java.util.Map;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-
-public class HeartbeatUtils {
-
- public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- NodeControllerState state = nodeMap.get(nodeId);
- if (state != null) {
- state.notifyHeartbeat(hbData);
- }
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b7fcf7f..6049a3b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
heartbeatTask = new HeartbeatTask(ccs);
- // Schedule heartbeat generator.
+ // Schedule heartbeat generator.
timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c1efa91..5f78362 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -20,6 +20,7 @@
private final int runid;
private FrameTupleAccessor acccessor;
private int tupleIndex;
+ private int tupleOffset;
public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex) {
super();
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index de5ef3b..d748d1f 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -12,8 +12,7 @@
! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
! See the License for the specific language governing permissions and
! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ !--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
@@ -22,7 +21,7 @@
<name>pregelix</name>
<properties>
- <jvm.extraargs />
+ <jvm.extraargs/>
</properties>
<profiles>
@@ -107,6 +106,7 @@
<module>pregelix-runtime</module>
<module>pregelix-core</module>
<module>pregelix-example</module>
+ <module>pregelix-benchmark</module>
<module>pregelix-dist</module>
</modules>
</project>
diff --git a/pregelix/pregelix-benchmark/pom.xml b/pregelix/pregelix-benchmark/pom.xml
new file mode 100644
index 0000000..4d7d456
--- /dev/null
+++ b/pregelix/pregelix-benchmark/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pregelix</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.10-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pregelix-benchmark</artifactId>
+ <name>pregelix-benchmark</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <version>1.0.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
new file mode 100644
index 0000000..04c29de
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
+ private final DoubleWritable vertexValue = new DoubleWritable();
+ private final DoubleWritable msg = new DoubleWritable();
+ private int maxIteration = -1;
+
+ @Override
+ public void compute(Iterable<DoubleWritable> msgIterator) {
+ if (maxIteration < 0) {
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ }
+ if (getSuperstep() == 1) {
+ vertexValue.set(1.0 / getTotalNumVertices());
+ }
+ if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
+ double sum = 0;
+ for (DoubleWritable msg : msgIterator) {
+ sum += msg.get();
+ }
+ vertexValue.set((0.15 / getTotalNumVertices()) + 0.85 * sum);
+ }
+
+ if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
+ long edges = getNumEdges();
+ msg.set(vertexValue.get() / edges);
+ sendMessageToAllEdges(msg);
+ } else {
+ voteToHalt();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
new file mode 100644
index 0000000..3d85f66
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.benchmark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MapMutableEdge;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextVertexReaderFromEachLine() {
+ String[] items;
+
+ @Override
+ protected VLongWritable getId(Text line) throws IOException {
+ items = line.toString().split(" ");
+ return new VLongWritable(Long.parseLong(items[0]));
+ }
+
+ @Override
+ protected DoubleWritable getValue(Text line) throws IOException {
+ return null;
+ }
+
+ @Override
+ protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
+ List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
+ Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ for (int i = 1; i < items.length; i++) {
+ edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ }
+ for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ edge.setEntry(entry);
+ edge.setValue(null);
+ edges.add(edge);
+ }
+ return edges;
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dist/pom.xml b/pregelix/pregelix-dist/pom.xml
index f0551a6..cec6efe 100644
--- a/pregelix/pregelix-dist/pom.xml
+++ b/pregelix/pregelix-dist/pom.xml
@@ -63,5 +63,10 @@
<artifactId>pregelix-example</artifactId>
<version>0.2.10-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix-benchmark</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 90065c2..44d23a4 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,13 +14,8 @@
*/
package edu.uci.ics.pregelix.example.data;
-import java.io.DataInput;
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.WritableUtils;
-
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
+import edu.uci.ics.pregelix.example.utils.SerDeUtils;
/**
* @author yingyib
@@ -31,14 +26,10 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
- private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
- private DataInput dis = new DataInputStream(bis);
-
@Override
public int getNormalizedKey(byte[] bytes, int start, int length) {
try {
- bis.setByteArray(bytes, start);
- long value = WritableUtils.readVLong(dis);
+ long value = SerDeUtils.readVLong(bytes, start, length);
int highValue = (int) (value >> 32);
if (highValue > 0) {
/**