inherit Hadoop's VLongWritable format
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 7d824ea..90065c2 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.pregelix.example.data;
+import java.io.DataInput;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.WritableUtils;
+
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* @author yingyib
@@ -26,34 +31,42 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
@Override
public int getNormalizedKey(byte[] bytes, int start, int length) {
- long value = SerDeUtils.readVLong(bytes, start, length);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0; TODO: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
+ try {
+ bis.setByteArray(bytes, start);
+ long value = WritableUtils.readVLong(dis);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ec1109f..1c5f629 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,14 +16,14 @@
package edu.uci.ics.pregelix.example.io;
import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.DataInputStream;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
import edu.uci.ics.pregelix.api.io.WritableSizable;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* A WritableComparable for longs in a variable-length format. Such values take
@@ -32,17 +32,7 @@
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable, WritableSizable {
- private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
- private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
- private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
- private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
- private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
- private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
- private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
- private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
-
- private long value;
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
public VLongWritable() {
}
@@ -52,78 +42,46 @@
}
public int sizeInBytes() {
- if (value >= 0 && value <= ONE_BYTE_MAX) {
+ long i = get();
+ if (i >= -112 && i <= 127) {
return 1;
- } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
- return 2;
- } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
- return 3;
- } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
- return 4;
- } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
- return 5;
- } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
- return 6;
- } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
- return 7;
- } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
- return 8;
- } else {
- return 9;
}
- }
- /** Set the value of this LongWritable. */
- public void set(long value) {
- this.value = value;
- }
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
- /** Return the value of this LongWritable. */
- public long get() {
- return value;
- }
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
- public void readFields(DataInput in) throws IOException {
- value = SerDeUtils.readVLong(in);
- }
-
- public void write(DataOutput out) throws IOException {
- SerDeUtils.writeVLong(out, value);
- }
-
- /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
- public boolean equals(Object o) {
- if (!(o instanceof VLongWritable))
- return false;
- VLongWritable other = (VLongWritable) o;
- return this.value == other.value;
- }
-
- public int hashCode() {
- return (int) value;
- }
-
- /** Compares two VLongWritables. */
- public int compareTo(Object o) {
- long thisValue = this.value;
- long thatValue = ((VLongWritable) o).value;
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
- }
-
- public String toString() {
- return Long.toString(value);
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ return len + 1;
}
/** A Comparator optimized for LongWritable. */
public static class Comparator extends WritableComparator {
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
public Comparator() {
super(VLongWritable.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- long thisValue = SerDeUtils.readVLong(b1, s1, l1);
- long thatValue = SerDeUtils.readVLong(b2, s2, l2);
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ try {
+ bis.setByteArray(b1, s1);
+ long thisValue = WritableUtils.readVLong(dis);
+ bis.setByteArray(b2, s2);
+ long thatValue = WritableUtils.readVLong(dis);
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 196b114..638011b 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -41,10 +41,13 @@
public void testVLong() throws Exception {
Random rand = new Random(System.currentTimeMillis());
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ msgList.add(new VLongWritable(Long.MAX_VALUE));
+ msgList.add(new VLongWritable(Long.MIN_VALUE));
+ msgList.add(new VLongWritable(-1));
for (int i = 0; i < 1000000; i++) {
msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
}
- verifySizeEstimation(msgList);
+ verifyExactSizeEstimation(msgList);
}
@Test
@@ -96,7 +99,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testNull() throws Exception {
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
@@ -105,7 +108,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testVInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -115,7 +118,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -148,4 +151,26 @@
}
}
+ private void verifyExactSizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ int accumulatedSize = 5;
+ for (int i = 0; i < msgList.size(); i++) {
+ bos.reset();
+ WritableSizable value = msgList.get(i);
+ value.write(dos);
+ if (value.sizeInBytes() != bos.size()) {
+ throw new Exception(value + " estimated size (" + value.sizeInBytes()
+ + ") is smaller than the actual size" + bos.size());
+ }
+ accumulatedSize += value.sizeInBytes();
+ }
+ bos.reset();
+ msgList.write(dos);
+ if (accumulatedSize < bos.size()) {
+ throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+ + bos.size());
+ }
+ }
+
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index d46457c..ab564fa 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -205,7 +205,11 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
- aggregator.step(vertex);
+
+ if (msgContentList.segmentEnd()) {
+ /** the if condition makes sure aggregate only calls once per-vertex */
+ aggregator.step(vertex);
+ }
}
@Override