add runtime checks to improve pregelix debug-ability
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
index 7d5f627..f9cad9f 100755
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -36,19 +36,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +50,9 @@
position += l;
return l;
}
-}
\ No newline at end of file
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 8709301..8e731ba 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -33,8 +33,8 @@
int srcStart = fieldEndOffsets[0];
int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
- if (srcLen <= frSize) {
- //doing in-place update if possible, save the "real update" overhead
+ if (srcLen == frSize) {
+ //doing in-place update if the vertex size is fixed, save the "real update" overhead
System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
frameTuple.getFieldStart(1), srcLen);
cloneUpdateTb.reset();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
index b697466..5be9ffc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
package edu.uci.ics.pregelix.dataflow.util;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ResetableByteArrayInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
private byte[] data;
private int position;
@@ -36,19 +33,12 @@
public int read() {
int remaining = data.length - position;
int value = remaining > 0 ? (data[position++] & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = data.length - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
@@ -57,4 +47,9 @@
position += l;
return l;
}
+
+ @Override
+ public int available() {
+ return data.length - position;
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 6386bdb..85f3959 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -49,7 +49,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter > len) {
+ if (availableBefore - availableAfter != len) {
throw new IllegalStateException(ERROR_MSG);
}
@@ -82,7 +82,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter > fieldLength) {
+ if (availableBefore - availableAfter != fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
@@ -97,7 +97,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter > len) {
+ if (availableBefore - availableAfter != len) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
@@ -127,7 +127,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter > fieldLength) {
+ if (availableBefore - availableAfter != fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
@@ -141,7 +141,7 @@
int availableBefore = bbis.available();
Object instance = recordDescriptor.getFields()[i].deserialize(di);
int availableAfter = bbis.available();
- if (availableBefore - availableAfter > fieldLength) {
+ if (availableBefore - availableAfter != fieldLength) {
throw new IllegalStateException(ERROR_MSG);
}
record[i] = instance;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
index 4eaa21c..b7689de 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
@@ -32,6 +32,7 @@
@Override
public void writeNull(DataOutput out) throws HyracksDataException {
try {
+ out.writeByte(3); //start|end
out.writeInt(0);
} catch (IOException e) {
throw new HyracksDataException(e);