use hadoop's vlong and fix driver's re-attempt to be 1
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index c52130d..26cb8d0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -35,7 +36,6 @@
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
@@ -270,7 +270,7 @@
delegate.setVertex(this);
}
destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
+ long edgeMapSize = WritableUtils.readVLong(in);
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = allocateEdge();
edge.setConf(getContext().getConfiguration());
@@ -278,7 +278,7 @@
addEdge(edge);
}
msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
+ long msgListSize = WritableUtils.readVLong(in);
for (long i = 0; i < msgListSize; ++i) {
M msg = allocateMessage();
msg.readFields(in);
@@ -297,11 +297,11 @@
if (vertexValue != null) {
vertexValue.write(out);
}
- SerDeUtils.writeVLong(out, destEdgeList.size());
+ WritableUtils.writeVLong(out, destEdgeList.size());
for (Edge<I, E> edge : destEdgeList) {
edge.write(out);
}
- SerDeUtils.writeVLong(out, msgList.size());
+ WritableUtils.writeVLong(out, msgList.size());
for (M msg : msgList) {
msg.write(out);
}
@@ -618,7 +618,6 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
- *
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
index 25b07ff..a4336a3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -40,47 +40,4 @@
object.readFields(input);
}
- public static long readVLong(DataInput in) throws IOException {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) in.readByte();
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- }
- return value;
- }
-
- public static void writeVLong(DataOutput out, long value) throws IOException {
- long data = value;
- do {
- byte b = (byte) (data & 0x7f);
- data >>= 7;
- if (data != 0) {
- b |= 0x80;
- }
- out.write(b);
- } while (data != 0);
- }
-
- public static long readVLong(byte[] data, int start, int length) {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) data[start];
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- ++start;
- }
- if (vLen != length)
- throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
- return value;
- }
-
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 2d4064b..5325397 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -204,6 +204,7 @@
private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
+ job.setMaxReattempts(0);
JobId jobId = hcc.startJob(deploymentId, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);