fix in-place update
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 6564eb0..245c0f5 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -141,6 +141,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f3a0bb4..b1d1043 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -110,7 +113,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -262,7 +266,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -271,12 +276,16 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
-
// write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
index 88577c2..8947c01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -57,7 +58,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index ca8ec01..05b4e87 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -113,7 +116,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -255,7 +259,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -264,6 +269,11 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();