refactoring dataflow operators to be more general--suport both b-tree and lsm b-tree
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 7c91c02..4175078 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -420,7 +420,7 @@
private M allocateMessage() {
M message;
if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
+ message = msgPool.get(usedMessage);
usedMessage++;
} else {
message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 877c6fa..548a4db 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -59,7 +59,7 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -133,7 +133,7 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 7af1f79..1949172 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -59,7 +59,7 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -127,7 +127,7 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 641774c..55a30f9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -58,7 +58,7 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -129,7 +129,7 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index cec5c63..4d58326 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -58,7 +58,7 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -126,7 +126,7 @@
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
- BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index bd45509..238b775 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -54,7 +51,7 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private BTree btree;
+ private ITreeIndex index;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
private boolean lowKeyInclusive;
@@ -62,9 +59,8 @@
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final IFrameWriter[] writers;
@@ -99,7 +95,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -112,13 +108,11 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
if (lowKey != null)
lowKeySearchFields = lowKey.getFieldCount();
if (highKey != null)
@@ -126,7 +120,7 @@
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -135,10 +129,9 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
-
}
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
@@ -147,9 +140,10 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -201,7 +195,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index d3114d2..221a818 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -29,10 +29,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -48,7 +48,7 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
private boolean lowKeyInclusive;
@@ -57,7 +57,7 @@
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
private IIndexCursor cursor;
- protected ITreeIndexAccessor indexAccessor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -91,11 +91,11 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
writer.open();
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
if (lowKey != null)
lowKeySearchFields = lowKey.getFieldCount();
if (highKey != null)
@@ -103,7 +103,7 @@
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -112,7 +112,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
}
@@ -120,11 +120,11 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
setCursor();
} catch (Exception e) {
treeIndexOpHelper.close();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 289128e..06119ea 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -30,13 +30,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -60,13 +57,12 @@
private ArrayTupleBuilder nullTupleBuilder;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -111,7 +107,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -124,18 +120,16 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
// construct range predicate
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -144,7 +138,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
}
@@ -164,7 +158,8 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -179,8 +174,8 @@
match = false;
}
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -239,7 +234,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 6f7643e..48812b7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -30,13 +30,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -53,14 +50,13 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private boolean isForward;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
private MultiComparator highKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -93,7 +89,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -101,19 +97,18 @@
accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
setCursor();
writer.open();
// construct range predicate
// TODO: Can we construct the multicmps using helper methods?
- int lowKeySearchFields = btree.getComparatorFactories().length;
- int highKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -122,7 +117,7 @@
} else {
IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
highKeySearchCmp = new MultiComparator(highKeySearchComparators);
@@ -131,12 +126,12 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+ tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index c978614..ee821df 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -55,13 +52,12 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private BTree btree;
+ private ITreeIndex index;
private boolean isForward;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private PermutingFrameTupleReference lowKey;
@@ -102,7 +98,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -112,15 +108,13 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
rangePred = new RangePredicate(null, null, true, true, null, null);
- int lowKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
@@ -128,7 +122,8 @@
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -142,8 +137,8 @@
currentTopTuple = cursor.getTuple();
match = false;
}
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.close();
throw new HyracksDataException(e);
@@ -210,7 +205,7 @@
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index eef7c85..d6f95f9 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -29,13 +29,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -51,12 +48,11 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private BTree btree;
+ private ITreeIndex index;
private RangePredicate rangePred;
private MultiComparator lowKeySearchCmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ private IIndexCursor cursor;
+ protected IIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
private final RecordDescriptor inputRecDesc;
@@ -86,7 +82,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
@Override
@@ -95,26 +91,25 @@
try {
treeIndexOpHelper.open();
- btree = (BTree) treeIndexOpHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
writer.open();
rangePred = new RangePredicate(null, null, true, true, null, null);
- int lowKeySearchFields = btree.getComparatorFactories().length;
+ int lowKeySearchFields = index.getComparatorFactories().length;
IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
+ tb = new ArrayTupleBuilder(index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
/** set the search cursor */
rangePred.setLowKey(null, true);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
similarity index 93%
rename from pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
rename to pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
index 784288f..f651d68 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
@@ -33,7 +33,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-public class BTreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+public class TreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@
private final int outputArity;
- public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ public TreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -84,7 +84,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+ return new TreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory, preHookFactory,
postHookFactory, inputRdFactory, outputArity);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
similarity index 76%
rename from pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
rename to pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index 0216b52..13aab31 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -27,13 +28,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
@@ -48,7 +46,7 @@
import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
-public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
protected TreeIndexDataflowHelper treeIndexHelper;
protected FrameTupleAccessor accessor;
@@ -57,7 +55,7 @@
protected ArrayTupleBuilder tb;
protected DataOutput dos;
- protected BTree btree;
+ protected ITreeIndex index;
protected boolean isForward;
protected PermutingFrameTupleReference lowKey;
protected PermutingFrameTupleReference highKey;
@@ -66,9 +64,11 @@
protected RangePredicate rangePred;
protected MultiComparator lowKeySearchCmp;
protected MultiComparator highKeySearchCmp;
- protected ITreeIndexCursor cursor;
+ protected IIndexCursor cursor;
protected ITreeIndexFrame cursorFrame;
- protected ITreeIndexAccessor indexAccessor;
+ protected IIndexAccessor indexAccessor;
+ protected int[] lowKeyFields;
+ protected int[] highKeyFields;
protected RecordDescriptor recDesc;
@@ -78,7 +78,7 @@
private final UpdateBuffer updateBuffer;
private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
- public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+ public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
@@ -88,6 +88,8 @@
this.isForward = isForward;
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
+ this.lowKeyFields = lowKeyFields;
+ this.highKeyFields = highKeyFields;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
if (lowKeyFields != null && lowKeyFields.length > 0) {
lowKey = new PermutingFrameTupleReference();
@@ -114,25 +116,46 @@
try {
treeIndexHelper.open();
- btree = (BTree) treeIndexHelper.getIndexInstance();
- cursorFrame = btree.getLeafFrameFactory().createFrame();
- setCursor();
+ index = (ITreeIndex) treeIndexHelper.getIndexInstance();
+ cursorFrame = index.getLeafFrameFactory().createFrame();
// Construct range predicate.
- lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), lowKey);
- highKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), highKey);
+ int lowKeySearchFields = index.getComparatorFactories().length;
+ int highKeySearchFields = index.getComparatorFactories().length;
+ if (lowKey != null)
+ lowKeySearchFields = lowKey.getFieldCount();
+ if (highKey != null)
+ highKeySearchFields = highKey.getFieldCount();
+
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+ }
+ lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+ }
+ highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+ }
+
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
+ tb = new ArrayTupleBuilder(index.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ setCursor();
- cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
- updateBuffer.setFieldCount(btree.getFieldCount());
+ cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+ updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
treeIndexHelper.close();
throw new HyracksDataException(e);
@@ -140,7 +163,7 @@
}
protected void setCursor() {
- cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+ cursor = indexAccessor.createSearchCursor();
}
protected void writeSearchResults() throws Exception {
@@ -184,7 +207,7 @@
try {
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index b141eaf..8709301 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -19,15 +19,15 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public class CopyUpdateUtil {
public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
- UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, ITreeIndexAccessor indexAccessor,
- ITreeIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+ UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
+ IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
if (cloneUpdateTb.getSize() > 0) {
int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
int srcStart = fieldEndOffsets[0];
@@ -46,7 +46,7 @@
//release the cursor/latch
cursor.close();
//batch update
- updateBuffer.updateBTree(indexAccessor);
+ updateBuffer.updateIndex(indexAccessor);
//try append the to-be-updated tuple again
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
throw new HyracksDataException("cannot append tuple builder!");
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index 327178c..1ff3959 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
/**
@@ -87,7 +87,7 @@
}
}
- public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+ public void updateIndex(IIndexAccessor bta) throws HyracksDataException, IndexException {
// batch update
for (int i = 0; i <= currentInUse; i++) {
ByteBuffer buffer = buffers.get(i);