fix in-place update
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 2992dfe..ac722e2 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -477,9 +477,14 @@
private void acquireWriteLatch(boolean markDirty) {
latch.writeLock().lock();
if (markDirty) {
- if (dirty.compareAndSet(false, true)) {
- pinCount.incrementAndGet();
- }
+ markDirty();
+ }
+ }
+
+ @Override
+ public void markDirty() {
+ if (dirty.compareAndSet(false, true)) {
+ pinCount.incrementAndGet();
}
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
index 22c02d1..b2ba638 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -20,4 +20,6 @@
public Object getReplacementStrategyObject();
public boolean pinIfGoodVictim();
+
+ public void markDirty();
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 2e83d2d..fcbf9f2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -17,7 +17,6 @@
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index d4c0ee6..2b435a1 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -1,28 +1,24 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow-std-base</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow-std-base</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </parent>
<properties>
@@ -58,7 +54,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.4.1</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
@@ -94,6 +90,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index b8ba7bd..081b3bc 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -18,17 +18,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
public interface IUpdateFunction extends IFunction {
- /**
- * update the tuple pointed by tupleRef called after process,
- * one-input-tuple-at-a-time
- *
- * @param tupleRef
- * @throws HyracksDataException
- */
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException;
+ /**
+ * update the tuple pointed by tupleRef called after process,
+ * one-input-tuple-at-a-time
+ *
+ * @param tupleRef
+ * @throws HyracksDataException
+ */
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 2332188..b646661 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -68,6 +70,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -77,6 +80,11 @@
throws HyracksDataException {
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
@@ -170,13 +178,13 @@
/**
* call the update function
*/
- functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
/**
* doing copy update
*/
CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
- cursor, rangePred, false);
+ cursor, rangePred, false, storageType);
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 3a36ab4..2557a07 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -45,6 +46,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
@@ -79,6 +81,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -88,6 +91,11 @@
inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -287,13 +295,13 @@
/**
* function call
*/
- functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb, cursor);
/**
* doing clone update
*/
CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
- cursor, rangePred, true);
+ cursor, rangePred, true, storageType);
}
/** write result for outer case */
@@ -301,11 +309,11 @@
/**
* function call
*/
- functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred, true);
+ rangePred, true, storageType);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 8d97289..1d9fd70 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -43,6 +44,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -71,6 +73,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -79,6 +82,11 @@
IRecordDescriptorFactory inputRdFactory, int outputArity) throws HyracksDataException {
treeIndexOpHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexOpHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.isForward = isForward;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
@@ -238,21 +246,21 @@
/** write the right result */
private void writeRightResults(ITupleReference frameTuple) throws Exception {
- functionProxy.functionCall(frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred, true);
+ rangePred, true, storageType);
}
/** write the left result */
private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred, true);
+ rangePred, true, storageType);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index dbdbfa2..1003431 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -44,6 +45,7 @@
import edu.uci.ics.pregelix.dataflow.util.CopyUpdateUtil;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
+import edu.uci.ics.pregelix.dataflow.util.StorageType;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
@@ -77,6 +79,7 @@
private ArrayTupleBuilder cloneUpdateTb;
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
+ private final StorageType storageType;
public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -86,6 +89,11 @@
throws HyracksDataException {
treeIndexHelper = (IndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ if (treeIndexHelper instanceof TreeIndexDataflowHelper) {
+ storageType = StorageType.TreeIndex;
+ } else {
+ storageType = StorageType.LSMIndex;
+ }
this.isForward = isForward;
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
@@ -171,11 +179,11 @@
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- functionProxy.functionCall(tuple, cloneUpdateTb);
+ functionProxy.functionCall(tuple, cloneUpdateTb, cursor);
//doing clone update
CopyUpdateUtil.copyUpdate(tempTupleReference, tuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred, true);
+ rangePred, true, storageType);
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index d2478ea..58ec8ab 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -18,27 +18,36 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
public class CopyUpdateUtil {
public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
- IIndexCursor cursor, RangePredicate rangePred, boolean scan) throws HyracksDataException, IndexException {
+ IIndexCursor cursor, RangePredicate rangePred, boolean scan, StorageType type) throws HyracksDataException,
+ IndexException {
if (cloneUpdateTb.getSize() > 0) {
- int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
- int srcStart = fieldEndOffsets[0];
- int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
- int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
- if (srcLen == frSize) {
- //doing in-place update if the vertex size is not larger than the original size, save the "real update" overhead
- System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
- frameTuple.getFieldStart(1), srcLen);
- cloneUpdateTb.reset();
- return;
+ if (type == StorageType.TreeIndex) {
+ int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
+ int srcStart = fieldEndOffsets[0];
+ int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
+ int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
+ if (srcLen == frSize) {
+ System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
+ frameTuple.getFieldStart(1), srcLen);
+ cloneUpdateTb.reset();
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
+ return;
+ }
}
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
tempTupleReference.reset(frameTuple.getFieldData(0), frameTuple.getFieldStart(0),
@@ -51,7 +60,7 @@
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
throw new HyracksDataException("cannot append tuple builder!");
}
- //search again and recover the cursor
+ //search again and recover the cursor to the exact point as the one before it is closed
cursor.reset();
rangePred.setLowKey(tempTupleReference, true);
if (scan) {
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index a1e5b86..e5bdb17 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
@@ -80,10 +81,10 @@
* @throws HyracksDataException
*/
public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
- ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
function.process(tuple);
- function.update(right, cloneUpdateTb);
+ function.update(right, cloneUpdateTb, cursor);
}
/**
@@ -92,10 +93,11 @@
* @param updateRef
* @throws HyracksDataException
*/
- public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(updateRef);
function.process(tuple);
- function.update(updateRef, cloneUpdateTb);
+ function.update(updateRef, cloneUpdateTb, cursor);
}
/**
@@ -107,11 +109,11 @@
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException {
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb,
+ IIndexCursor cursor) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
function.process(tuple);
- function.update(inPlaceUpdateRef, cloneUpdateTb);
+ function.update(inPlaceUpdateRef, cloneUpdateTb, cursor);
}
/**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
new file mode 100644
index 0000000..fb2d1eb
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/StorageType.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+public enum StorageType {
+ TreeIndex,
+ LSMIndex
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
index e224617..1e6359d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/UpdateVertex.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.example;
import java.util.Iterator;
+import java.util.Random;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
@@ -29,6 +30,7 @@
private final int MAX_VALUE_SIZE = 32768 / 2;
private VLongWritable msg = new VLongWritable();
private Text tempValue = new Text();
+ private Random rand = new Random();
@Override
public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
@@ -52,28 +54,52 @@
int valueSize = getVertexValue().toString().toCharArray().length;
long expectedValueSize = msgIterator.next().get();
if (valueSize != expectedValueSize) {
- throw new IllegalStateException("vertex id: " + getVertexId() + " vertex value size:" + valueSize
- + ", expected value size:" + expectedValueSize);
+ if (valueSize == -expectedValueSize) {
+ //verify fixed size update
+ char[] valueCharArray = getVertexValue().toString().toCharArray();
+ for (int i = 0; i < valueCharArray.length; i++) {
+ if (valueCharArray[i] != 'b') {
+ throw new IllegalStateException("vertex id: " + getVertexId()
+ + " has a un-propagated update in the last iteration");
+ }
+ }
+ } else {
+ throw new IllegalStateException("vertex id: " + getVertexId() + " vertex value size:" + valueSize
+ + ", expected value size:" + expectedValueSize);
+ }
+ }
+ if (msgIterator.hasNext()) {
+ throw new IllegalStateException("more than one message for vertex " + " " + getVertexId() + " "
+ + getVertexValue());
}
}
private void updateAndSendMsg() {
- int newValueSize = (int) Math.pow(Math.abs(getVertexId().get()), getSuperstep()) % MAX_VALUE_SIZE;
+ int newValueSize = rand.nextInt(MAX_VALUE_SIZE);
char[] charArray = new char[newValueSize];
for (int i = 0; i < charArray.length; i++) {
charArray[i] = 'a';
}
/**
+ * set a self-message
+ */
+ msg.set(newValueSize);
+ boolean fixedSize = getVertexId().get() < 2000;
+ if (fixedSize) {
+ int oldSize = getVertexValue().toString().toCharArray().length;
+ charArray = new char[oldSize];
+ for (int i = 0; i < oldSize; i++) {
+ charArray[i] = 'b';
+ }
+ msg.set(-oldSize);
+ }
+
+ /**
* set the vertex value
*/
tempValue.set(new String(charArray));
setVertexValue(tempValue);
- /**
- * send a self-message
- */
- msg.set(newValueSize);
-
sendMsg(getVertexId(), msg);
activate();
}
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 6564eb0..245c0f5 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -141,6 +141,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f3a0bb4..b1d1043 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -110,7 +113,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -262,7 +266,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -271,12 +276,16 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
-
// write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
index 88577c2..8947c01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -57,7 +58,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index ca8ec01..05b4e87 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -31,6 +31,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -113,7 +116,8 @@
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ //LSM index does not have in-place update
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -255,7 +259,8 @@
}
@Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
@@ -264,6 +269,11 @@
int offset = tupleRef.getFieldStart(1);
bbos.setByteArray(data, offset);
vertex.write(output);
+
+ BTreeRangeSearchCursor btreeCursor = (BTreeRangeSearchCursor) cursor;
+ ICachedPageInternal page = (ICachedPageInternal) btreeCursor.getPage();
+ //IMPORTANT: mark the page to be dirty
+ page.markDirty();
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();