Using btree upsert in lsmbtree
Added mechanism for passing around operation callbacks
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1307 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 301d1bd..88fef58 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -48,6 +48,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -151,9 +152,9 @@
// create operator descriptor
TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories,
- primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+ spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
// prepare insertion into secondary index
@@ -175,9 +176,9 @@
options.secondaryBTreeName);
// create operator descriptor
TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider,
- secondaryTypeTraits,
- secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+ spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+ secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
// end the insert pipeline at this sink operator
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 9375285..8f77c15 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
// This example will load a primary index from randomly generated data
@@ -152,8 +153,8 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, indexRegistryProvider, btreeSplitProvider,
- typeTraits, comparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 76aa660..1a43a0b 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -45,6 +45,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
// This example will perform an ordered scan on the primary index
@@ -142,8 +143,8 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, storageManager,
- indexRegistryProvider, btreeSplitProvider, typeTraits,
- comparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
+ indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories, lowKeyFields,
+ highKeyFields, true, true, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeSearchOp, splitNCs);
// have each node print the results of its respective B-Tree
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 729fae4..11c3ed6 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -41,6 +41,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDiskOrderScanOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
// This example will load a secondary index with <key, primary-index key> pairs
@@ -122,8 +123,8 @@
IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new TreeIndexDiskOrderScanOperatorDescriptor(spec,
- recDesc, storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, dataflowHelperFactory);
+ recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
// sort the tuples as preparation for bulk load into secondary index
@@ -144,9 +145,8 @@
int[] fieldPermutation = { 1, 0 };
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, indexRegistryProvider, btreeSplitProvider,
- secondaryTypeTraits, comparatorFactories, fieldPermutation, 0.7f,
- dataflowHelperFactory);
+ storageManager, indexRegistryProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
+ fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// connect the ops
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 67aa546..5f9d2bc 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -45,6 +45,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
// This example will perform range search on the secondary index
@@ -168,9 +169,9 @@
options.secondaryBTreeName);
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
- storageManager, indexRegistryProvider, secondarySplitProvider,
- secondaryTypeTraits, searchComparatorFactories, secondaryLowKeyFields,
- secondaryHighKeyFields, true, true, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+ searchComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, secondarySearchOp, splitNCs);
// secondary index will output tuples with [UTF8String, Integer]
@@ -184,9 +185,9 @@
IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
- primaryHighKeyFields, true, true, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
JobHelper.createPartitionConstraint(spec, primarySearchOp, splitNCs);
// have each node print the results of its respective B-Tree
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 9d252bc..f512ba0 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -50,6 +50,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
@@ -158,7 +159,8 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+ primaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -194,7 +196,8 @@
// scan primary index
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
// sort based on secondary keys
@@ -207,7 +210,8 @@
int[] fieldPermutation = { 3, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+ secondaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
@@ -218,8 +222,8 @@
runTest(spec);
}
- protected void insertPipeline() throws Exception {
-
+ protected void insertPipeline(boolean useUpsert) throws Exception {
+ IndexOp pipelineOperation = useUpsert ? IndexOp.INSERT : IndexOp.UPSERT;
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
@@ -244,14 +248,16 @@
int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7 };
TreeIndexInsertUpdateDeleteOperatorDescriptor primaryBtreeInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, ordersDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+ primaryComparatorFactories, primaryFieldPermutation, pipelineOperation, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeInsertOp, NC1_ID);
// first secondary index
int[] fieldPermutationB = { 4, 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, ordersDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutationB, IndexOp.INSERT, dataflowHelperFactory);
+ secondaryComparatorFactories, fieldPermutationB, pipelineOperation, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
index 66ef309..3bdaa5a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
@@ -35,9 +35,10 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class BTreePrimaryIndexScanOperatorTest extends AbstractBTreeOperatorTest {
-
+
@Before
public void setup() throws Exception {
super.setup();
@@ -68,9 +69,9 @@
int[] highKeyFields = null; // + infinity
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
- highKeyFields, true, true, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -84,7 +85,7 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
@Override
protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
index 1ecb84c..878507b 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
@@ -35,9 +35,10 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class BTreePrimaryIndexSearchOperatorTest extends AbstractBTreeOperatorTest {
-
+
@Before
public void setup() throws Exception {
super.setup();
@@ -73,9 +74,9 @@
int[] highKeyFields = { 1 };
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
- highKeyFields, true, true, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -89,7 +90,7 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
@Override
protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
index 22f97f7..326a4b9 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
@@ -28,22 +28,23 @@
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class BTreePrimaryIndexStatsOperatorTest extends AbstractBTreeOperatorTest {
-
+
@Before
public void setup() throws Exception {
super.setup();
loadPrimaryIndex();
}
-
+
@Test
public void showPrimaryIndexStats() throws Exception {
JobSpecification spec = new JobSpecification();
TreeIndexStatsOperatorDescriptor primaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
- indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory);
+ indexRegistryProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryStatsOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
@@ -54,7 +55,7 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
@Override
protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
index e48294f..50bc259 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class BTreeSecondaryIndexInsertOperatorTest extends AbstractBTreeOperatorTest {
@@ -43,7 +44,7 @@
super.setup();
loadPrimaryIndex();
loadSecondaryIndex();
- insertPipeline();
+ insertPipeline(false);
}
@Test
@@ -76,9 +77,9 @@
// search secondary index
BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
- secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider,
- secondaryTypeTraits, secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
- dataflowHelperFactory);
+ secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+ secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
// second field from the tuples coming from secondary index
@@ -88,9 +89,9 @@
// search primary index
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, indexRegistryProvider, primarySplitProvider,
- primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
- primaryHighKeyFields, true, true, dataflowHelperFactory);
+ storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -105,7 +106,7 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
@Override
protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
index b1b6e39..cc8fe12 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class BTreeSecondaryIndexSearchOperatorTest extends AbstractBTreeOperatorTest {
@@ -77,7 +78,7 @@
BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
- dataflowHelperFactory);
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
int[] primaryLowKeyFields = { 1 }; // second field from the tuples
@@ -89,7 +90,7 @@
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
- dataflowHelperFactory);
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java
new file mode 100644
index 0000000..374d1a0
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java
@@ -0,0 +1,99 @@
+package edu.uci.ics.hyracks.tests.am.btree;
+
+import java.io.DataOutput;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+public class BTreeSecondaryIndexUpsertOperatorTest extends AbstractBTreeOperatorTest {
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ loadPrimaryIndex();
+ loadSecondaryIndex();
+ insertPipeline(true);
+ }
+
+ @Test
+ public void searchUpdatedSecondaryIndexTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // build tuple containing search keys (only use the first key as search
+ // key)
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
+ DataOutput dos = tb.getDataOutput();
+
+ tb.reset();
+ // low key
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
+ tb.addFieldEndOffset();
+ // high key
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
+ tb.addFieldEndOffset();
+
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
+
+ int[] secondaryLowKeyFields = { 0 };
+ int[] secondaryHighKeyFields = { 1 };
+
+ // search secondary index
+ BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
+ secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+ secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
+
+ // second field from the tuples coming from secondary index
+ int[] primaryLowKeyFields = { 1 };
+ // second field from the tuples coming from secondary index
+ int[] primaryHighKeyFields = { 1 };
+
+ // search primary index
+ BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+ dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
+
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondaryBtreeSearchOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Override
+ protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
+ return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
+ }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index a3afcbd..c54deea 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -56,6 +56,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifierFactory;
import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexBulkLoadOperatorDescriptor;
@@ -165,7 +166,8 @@
int[] fieldPermutation = { 0, 1 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory);
+ primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
return primaryBtreeBulkLoad;
}
@@ -191,7 +193,8 @@
int[] highKeyFields = null; // + infinity
BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory);
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
return primaryBtreeSearchOp;
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 4848920..e35d181 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -55,6 +55,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -203,7 +204,8 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory);
+ primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -239,14 +241,16 @@
// scan primary index
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory);
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
// load secondary index
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, rtreeDataflowHelperFactory);
+ secondaryComparatorFactories, fieldPermutation, 0.7f, rtreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
@@ -286,14 +290,16 @@
int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, ordersDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
- primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, btreeDataflowHelperFactory);
+ primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, btreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryInsertOp, NC1_ID);
// secondary index
int[] secondaryFieldPermutation = { 6, 7, 8, 9, 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, ordersDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, rtreeDataflowHelperFactory);
+ secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, rtreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index d75c0bc..83fc78a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
public class RTreeSecondaryIndexInsertOperatorTest extends AbstractRTreeOperatorTest {
@@ -80,7 +81,8 @@
RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+ secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
@@ -92,7 +94,7 @@
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
- btreeDataflowHelperFactory);
+ btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 21fbd91..0d1b319 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
public class RTreeSecondaryIndexScanOperatorTest extends AbstractRTreeOperatorTest {
@@ -78,7 +79,8 @@
RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+ secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 0e3203d..e804a45 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
public class RTreeSecondaryIndexSearchOperatorTest extends AbstractRTreeOperatorTest {
@@ -79,7 +80,8 @@
RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+ secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+ NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
@@ -91,7 +93,7 @@
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
- btreeDataflowHelperFactory);
+ btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
index 7c308cd..d858b95 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
public class RTreeSecondaryIndexStatsOperatorTest extends AbstractRTreeOperatorTest {
@@ -46,7 +47,7 @@
TreeIndexStatsOperatorDescriptor secondaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
- rtreeDataflowHelperFactory);
+ rtreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryStatsOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
index a31a57c..9bb3ffa 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
@@ -20,23 +20,22 @@
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
public class BTreeDataflowHelper extends TreeIndexDataflowHelper {
- public BTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists) {
- super(opDesc, ctx, partition, createIfNotExists);
+ public BTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
}
@Override
public ITreeIndex createIndexInstance() throws HyracksDataException {
try {
- // TODO: Figure out where to get the proper operation callback from.
return BTreeUtils.createBTree(opDesc.getStorageManager().getBufferCache(ctx),
- NoOpOperationCallback.INSTANCE, treeOpDesc.getTreeIndexTypeTraits(),
+ opCallbackProvider.getOperationCallback(), treeOpDesc.getTreeIndexTypeTraits(),
treeOpDesc.getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM);
} catch (BTreeException e) {
throw new HyracksDataException(e);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
index 308e305..7dcc5f3 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -26,7 +27,7 @@
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, boolean createIfNotExists) {
- return new BTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists);
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ return new BTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
}
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 6e05922..5883a1b 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -44,9 +45,10 @@
IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] lowKeyFields, int[] highKeyFields,
- boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory) {
+ boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IOperationCallbackProvider opCallbackProvider) {
super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory);
+ comparatorFactories, dataflowHelperFactory, opCallbackProvider);
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
this.lowKeyInclusive = lowKeyInclusive;
@@ -56,7 +58,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
- highKeyFields, lowKeyInclusive, highKeyInclusive);
+ return new BTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+ lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 10c5b6e..d583d04 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -25,17 +26,17 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
public class BTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
- protected PermutingFrameTupleReference lowKey;
- protected PermutingFrameTupleReference highKey;
- protected boolean lowKeyInclusive;
- protected boolean highKeyInclusive;
- protected MultiComparator lowKeySearchCmp;
- protected MultiComparator highKeySearchCmp;
+ protected PermutingFrameTupleReference lowKey;
+ protected PermutingFrameTupleReference highKey;
+ protected boolean lowKeyInclusive;
+ protected boolean highKeyInclusive;
+ protected MultiComparator lowKeySearchCmp;
+ protected MultiComparator highKeySearchCmp;
public BTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, IRecordDescriptorProvider recordDescProvider, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
- super(opDesc, ctx, partition, recordDescProvider);
+ IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
+ super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
@@ -58,13 +59,11 @@
highKey.reset(accessor, tupleIndex);
}
}
-
+
@Override
protected ISearchPredicate createSearchPredicate() {
lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
- highKeySearchCmp = BTreeUtils
- .getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
- return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
- highKeySearchCmp);
+ highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
+ return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp, highKeySearchCmp);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
index be967ef..5df52c1 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleUpdaterFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -40,16 +41,18 @@
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleUpdaterFactory tupleUpdaterFactory) {
+ IOperationCallbackProvider opCallbackProvider, ITupleUpdaterFactory tupleUpdaterFactory) {
super(spec, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, dataflowHelperFactory);
+ lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, dataflowHelperFactory,
+ opCallbackProvider);
this.tupleUpdaterFactory = tupleUpdaterFactory;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new BTreeUpdateSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
- highKeyFields, lowKeyInclusive, highKeyInclusive, tupleUpdaterFactory.createTupleUpdater());
+ return new BTreeUpdateSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+ lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+ tupleUpdaterFactory.createTupleUpdater());
}
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
index 5a70e87..903eca2 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
@@ -21,31 +21,30 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleUpdater;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
public class BTreeUpdateSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
private final ITupleUpdater tupleUpdater;
-
- public BTreeUpdateSearchOperatorNodePushable(
- AbstractTreeIndexOperatorDescriptor opDesc,
- IHyracksTaskContext ctx, int partition,
- IRecordDescriptorProvider recordDescProvider,
- int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
- boolean highKeyInclusive, ITupleUpdater tupleUpdater) {
- super(opDesc, ctx, partition, recordDescProvider, lowKeyFields,
- highKeyFields, lowKeyInclusive, highKeyInclusive);
- this.tupleUpdater = tupleUpdater;
- }
- @Override
- protected ITreeIndexCursor createCursor() {
+ public BTreeUpdateSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ ITupleUpdater tupleUpdater) {
+ super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive);
+ this.tupleUpdater = tupleUpdater;
+ }
+
+ @Override
+ protected ITreeIndexCursor createCursor() {
return new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
}
-
- @Override
- protected void writeSearchResults() throws Exception {
+
+ @Override
+ protected void writeSearchResults() throws Exception {
while (cursor.hasNext()) {
tb.reset();
cursor.next();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java
new file mode 100644
index 0000000..974ef1a
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.common.api;
+
+import java.io.Serializable;
+
+public interface IOperationCallbackProvider extends Serializable {
+ public IOperationCallback getOperationCallback();
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
index 2e2b67e..ab668d1 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public abstract class AbstractTreeIndexOperatorDescriptor extends
@@ -39,6 +40,8 @@
protected final ITypeTraits[] typeTraits;
protected final IIndexDataflowHelperFactory dataflowHelperFactory;
+ protected final IOperationCallbackProvider opCallbackProvider;
+
public AbstractTreeIndexOperatorDescriptor(JobSpecification spec,
int inputArity, int outputArity, RecordDescriptor recDesc,
IStorageManagerInterface storageManager,
@@ -46,7 +49,8 @@
IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
+ IIndexDataflowHelperFactory dataflowHelperFactory,
+ IOperationCallbackProvider opCallbackProvider) {
super(spec, inputArity, outputArity);
this.fileSplitProvider = fileSplitProvider;
this.storageManager = storageManager;
@@ -54,6 +58,7 @@
this.typeTraits = typeTraits;
this.comparatorFactories = comparatorFactories;
this.dataflowHelperFactory = dataflowHelperFactory;
+ this.opCallbackProvider = opCallbackProvider;
if (outputArity > 0) {
recordDescriptors[0] = recDesc;
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index a42cf5f..344929c 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -18,8 +18,10 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
public interface IIndexDataflowHelperFactory extends Serializable {
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
- final IHyracksTaskContext ctx, int partition, boolean createIfNotExists);
-}
+ final IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition,
+ boolean createIfNotExists);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 99ebab7..062e7eb 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -26,22 +27,24 @@
protected IIndex index;
protected int indexFileId = -1;
protected int partition;
-
+
protected final IIndexOperatorDescriptor opDesc;
protected final IHyracksTaskContext ctx;
+ protected final IOperationCallbackProvider opCallbackProvider;
protected final boolean createIfNotExists;
-
+
public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx,
- int partition, boolean createIfNotExists) {
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ this.opCallbackProvider = opCallbackProvider;
this.opDesc = opDesc;
- this.ctx = ctx;
+ this.ctx = ctx;
this.partition = partition;
this.createIfNotExists = createIfNotExists;
}
public void init() throws HyracksDataException {
IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
- IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+ IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
FileReference f = getFilereference();
int fileId = -1;
@@ -75,7 +78,7 @@
}
index = createIndexInstance();
if (createIfNotExists) {
- index.create(indexFileId);
+ index.create(indexFileId);
}
index.open(indexFileId);
indexRegistry.register(indexFileId, index);
@@ -88,7 +91,7 @@
IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
return fileSplitProvider.getFileSplits()[partition].getLocalFile();
}
-
+
public void deinit() throws HyracksDataException {
if (indexFileId != -1) {
IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
@@ -111,4 +114,8 @@
public int getIndexFileId() {
return indexFileId;
}
+
+ public IOperationCallbackProvider getOpCallbackProvider() {
+ return opCallbackProvider;
+ }
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index 90295be..ca74160 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class TreeIndexBulkLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -34,9 +35,10 @@
public TreeIndexBulkLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
IIndexRegistryProvider<IIndex> indexRegistryProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
- float fillFactor, IIndexDataflowHelperFactory dataflowHelperFactory) {
- super(spec, 1, 0, null, storageManager, indexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, dataflowHelperFactory);
+ float fillFactor, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IOperationCallbackProvider opCallbackProvider) {
+ super(spec, 1, 0, null, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, dataflowHelperFactory, opCallbackProvider);
this.fieldPermutation = fieldPermutation;
this.fillFactor = fillFactor;
}
@@ -44,7 +46,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new TreeIndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
- recordDescProvider);
+ return new TreeIndexBulkLoadOperatorNodePushable(this, ctx, opCallbackProvider, partition, fieldPermutation,
+ fillFactor, recordDescProvider);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index fe3b10f..f2031ca 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
public class TreeIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
@@ -37,9 +38,11 @@
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
public TreeIndexBulkLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+ IOperationCallbackProvider opCallbackProvider, int partition, int[] fieldPermutation, float fillFactor,
+ IRecordDescriptorProvider recordDescProvider) {
+
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, true);
+ opDesc, ctx, opCallbackProvider, partition, true);
this.fillFactor = fillFactor;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
index 17c1827..c9e7e08 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
@@ -24,14 +25,15 @@
public abstract class TreeIndexDataflowHelper extends IndexDataflowHelper {
protected ITreeIndexOperatorDescriptor treeOpDesc;
- public TreeIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists) {
- super(opDesc, ctx, partition, createIfNotExists);
+
+ public TreeIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
this.treeOpDesc = (ITreeIndexOperatorDescriptor) opDesc;
}
public abstract ITreeIndex createIndexInstance() throws HyracksDataException;
-
+
public ITreeIndexCursor createDiskOrderScanCursor(ITreeIndexFrame leafFrame) throws HyracksDataException {
return new TreeDiskOrderScanCursor(leafFrame);
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index ab16acb..4bf308e 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class TreeIndexDiskOrderScanOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -31,14 +32,14 @@
public TreeIndexDiskOrderScanOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
super(spec, 0, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits, null,
- dataflowHelperFactory);
+ dataflowHelperFactory, opCallbackProvider);
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new TreeIndexDiskOrderScanOperatorNodePushable(this, ctx, partition);
+ return new TreeIndexDiskOrderScanOperatorNodePushable(this, ctx, opCallbackProvider, partition);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index d207314..78c0a82 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -34,9 +35,9 @@
private ITreeIndex treeIndex;
public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
- IHyracksTaskContext ctx, int partition) {
+ IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition) {
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, false);
+ opDesc, ctx, opCallbackProvider, partition, false);
}
@Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index ac90c1a..a06b37b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -38,9 +39,9 @@
IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory);
+ comparatorFactories, dataflowHelperFactory, opCallbackProvider);
this.fieldPermutation = fieldPermutation;
this.op = op;
}
@@ -48,7 +49,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new TreeIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
- recordDescProvider, op);
+ return new TreeIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, opCallbackProvider, partition,
+ fieldPermutation, recordDescProvider, op);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index ddef338..623487b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
@@ -37,12 +38,12 @@
private IIndexAccessor indexAccessor;
public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
- IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
- IRecordDescriptorProvider recordDescProvider, IndexOp op) {
+ IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition,
+ int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOp op) {
// Only create the if insert operation is an insert.
boolean createIfNotExists = (op == IndexOp.INSERT);
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, createIfNotExists);
+ opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
this.recordDescProvider = recordDescProvider;
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
@@ -83,6 +84,10 @@
indexAccessor.update(tuple);
break;
}
+ case UPSERT: {
+ indexAccessor.upsert(tuple);
+ break;
+ }
case DELETE: {
indexAccessor.delete(tuple);
break;
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 399643e..8557332 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -29,42 +29,43 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- protected TreeIndexDataflowHelper treeIndexHelper;
- protected FrameTupleAccessor accessor;
+ protected TreeIndexDataflowHelper treeIndexHelper;
+ protected FrameTupleAccessor accessor;
- protected ByteBuffer writeBuffer;
- protected FrameTupleAppender appender;
- protected ArrayTupleBuilder tb;
- protected DataOutput dos;
+ protected ByteBuffer writeBuffer;
+ protected FrameTupleAppender appender;
+ protected ArrayTupleBuilder tb;
+ protected DataOutput dos;
- protected ITreeIndex treeIndex;
- protected ISearchPredicate searchPred;
- protected IIndexCursor cursor;
- protected ITreeIndexFrame cursorFrame;
- protected IIndexAccessor indexAccessor;
+ protected ITreeIndex treeIndex;
+ protected ISearchPredicate searchPred;
+ protected IIndexCursor cursor;
+ protected ITreeIndexFrame cursorFrame;
+ protected IIndexAccessor indexAccessor;
- protected RecordDescriptor recDesc;
+ protected RecordDescriptor recDesc;
public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, IRecordDescriptorProvider recordDescProvider) {
+ IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider) {
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, false);
+ opDesc, ctx, opCallbackProvider, partition, false);
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
}
-
+
protected abstract ISearchPredicate createSearchPredicate();
-
+
protected abstract void resetSearchPredicate(int tupleIndex);
-
+
protected IIndexCursor createCursor() {
return indexAccessor.createSearchCursor();
}
-
+
@Override
public void open() throws HyracksDataException {
accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index c227e17..b7beec0 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class TreeIndexStatsOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -35,14 +36,14 @@
public TreeIndexStatsOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
IIndexRegistryProvider<IIndex> indexRegistryProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
super(spec, 0, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory);
+ comparatorFactories, dataflowHelperFactory, opCallbackProvider);
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new TreeIndexStatsOperatorNodePushable(this, ctx, partition);
+ return new TreeIndexStatsOperatorNodePushable(this, ctx, opCallbackProvider, partition);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index ff007af..2e8e300 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexStats;
import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexStatsGatherer;
@@ -36,9 +37,9 @@
private TreeIndexStatsGatherer statsGatherer;
public TreeIndexStatsOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition) {
+ IOperationCallbackProvider opCallbackProvider, int partition) {
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, false);
+ opDesc, ctx, opCallbackProvider, partition, false);
this.ctx = ctx;
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index cd7ca70..828dd81 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -21,18 +21,21 @@
/**
* Dummy operation callback that simply does nothing. Mainly, intended to be
* used in non-transaction access method testing.
- *
*/
public class NoOpOperationCallback implements IOperationCallback {
public static IOperationCallback INSTANCE = new NoOpOperationCallback();
+ private NoOpOperationCallback() {
+ }
+
@Override
public void pre(ITupleReference tuple) {
-
+ // Do nothing.
}
@Override
public void post(ITupleReference tuple) {
+ // Do nothing.
}
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java
new file mode 100644
index 0000000..55dfb74
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.storage.am.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+
+/**
+ * Dummy NoOp callback provider used primarily for testing. Always returns the
+ * {@link NoOpOperationCallback} instance.
+ *
+ * Implemented as an enum to preserve singleton model while being serializable
+ */
+public enum NoOpOperationCallbackProvider implements IOperationCallbackProvider {
+ INSTANCE;
+
+ @Override
+ public IOperationCallback getOperationCallback() {
+ return NoOpOperationCallback.INSTANCE;
+ }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
index 4b5a492..77ad7ff 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
@@ -16,5 +16,5 @@
package edu.uci.ics.hyracks.storage.am.common.ophelpers;
public enum IndexOp {
- INSERT, DELETE, UPDATE, UPSERT, SEARCH, DISKORDERSCAN
+ INSERT, DELETE, UPDATE, UPSERT, SEARCH, DISKORDERSCAN, PHYSICALDELETE
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 5106d52..c4d6747 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex.InvertedIndexBulkLoadContext;
@@ -42,7 +43,7 @@
public InvertedIndexBulkLoadOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider) {
btreeDataflowHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
- .createIndexDataflowHelper(opDesc, ctx, partition, true);
+ .createIndexDataflowHelper(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, true);
invIndexDataflowHelper = new InvertedIndexDataflowHelper(btreeDataflowHelper, opDesc, ctx, partition, true);
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
index 1bba574..ed9a6bd 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedListBuilder;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.FixedSizeElementInvertedListBuilder;
@@ -33,7 +34,7 @@
public InvertedIndexDataflowHelper(TreeIndexDataflowHelper btreeDataflowHelper, IIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, boolean createIfNotExists) {
- super(opDesc, ctx, partition, createIfNotExists);
+ super(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, createIfNotExists);
this.btreeDataflowHelper = btreeDataflowHelper;
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 534bb65..8128e0d 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifier;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndexSearchPredicate;
@@ -45,11 +46,11 @@
private FrameTupleReference tuple;
private IRecordDescriptorProvider recordDescProvider;
private InvertedIndex invIndex;
-
+
private final InvertedIndexSearchPredicate searchPred;
private IIndexAccessor indexAccessor;
- private IIndexCursor resultCursor;
-
+ private IIndexCursor resultCursor;
+
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
private ArrayTupleBuilder tb;
@@ -62,7 +63,7 @@
IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
btreeDataflowHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
- .createIndexDataflowHelper(opDesc, ctx, partition, false);
+ .createIndexDataflowHelper(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, false);
invIndexDataflowHelper = new InvertedIndexDataflowHelper(btreeDataflowHelper, opDesc, ctx, partition, false);
this.queryField = queryField;
this.searchPred = new InvertedIndexSearchPredicate(searchModifier);
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 0ad8aa1..40195f1 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -20,12 +20,12 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
@@ -33,25 +33,26 @@
public class LSMBTreeDataflowHelper extends TreeIndexDataflowHelper {
private static int DEFAULT_MEM_PAGE_SIZE = 32768;
- private static int DEFAULT_MEM_NUM_PAGES = 1000;
-
+ private static int DEFAULT_MEM_NUM_PAGES = 1000;
+
private final int memPageSize;
- private final int memNumPages;
-
- public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists) {
- super(opDesc, ctx, partition, createIfNotExists);
+ private final int memNumPages;
+
+ public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
memPageSize = DEFAULT_MEM_PAGE_SIZE;
- memNumPages = DEFAULT_MEM_NUM_PAGES;
+ memNumPages = DEFAULT_MEM_NUM_PAGES;
}
-
- public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists, int memPageSize, int memNumPages) {
- super(opDesc, ctx, partition, createIfNotExists);
+
+ public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
+ int memNumPages) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
}
-
+
@Override
public ITreeIndex createIndexInstance() throws HyracksDataException {
ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
@@ -63,10 +64,9 @@
file.delete();
}
InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
- // TODO: Figure out where to get the proper operation callback from.
- return LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE, memFreePageManager, (IOManager) ctx.getIOManager(), file
- .getFile().getPath(), opDesc.getStorageManager().getBufferCache(ctx), opDesc
- .getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
- .getTreeIndexComparatorFactories());
+ return LSMBTreeUtils.createLSMTree(memBufferCache, opCallbackProvider.getOperationCallback(),
+ memFreePageManager, (IOManager) ctx.getIOManager(), file.getFile().getPath(), opDesc
+ .getStorageManager().getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx),
+ treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc.getTreeIndexComparatorFactories());
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index da00694..98cfeed 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -26,7 +27,7 @@
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, boolean createIfNotExists) {
- return new LSMBTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists);
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ return new LSMBTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index dee8072..454db50 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -26,9 +26,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
@@ -60,9 +57,9 @@
public class LSMBTree implements ILSMIndex, ITreeIndex {
protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
-
+
private final LSMHarness lsmHarness;
-
+
// In-memory components.
private final BTree memBTree;
private final InMemoryFreePageManager memFreePageManager;
@@ -74,24 +71,25 @@
// For creating BTree's used in bulk load. Different from diskBTreeFactory
// because it should have a different tuple writer in it's leaf frames.
private final BTreeFactory bulkLoadBTreeFactory;
- private final IBufferCache diskBufferCache;
+ private final IBufferCache diskBufferCache;
private final IFileMapProvider diskFileMapProvider;
// List of BTree instances. Using Object for better sharing via ILSMTree + LSMHarness.
private LinkedList<Object> diskBTrees = new LinkedList<Object>();
// Helps to guarantees physical consistency of LSM components.
private final ILSMComponentFinalizer componentFinalizer;
-
+
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory insertLeafFrameFactory;
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final IBinaryComparatorFactory[] cmpFactories;
-
+
private boolean isOpen = false;
-
+
public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback, InMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
- ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager, BTreeFactory diskBTreeFactory,
- BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
+ ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager,
+ BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider,
+ int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory);
this.memFreePageManager = memFreePageManager;
@@ -113,7 +111,7 @@
memBTree.create(indexFileId);
fileManager.createDirs();
}
-
+
/**
* Opens LSMBTree, cleaning up invalid files from base dir, and registering
* all valid files as on-disk BTrees.
@@ -124,14 +122,14 @@
*/
@Override
public void open(int indexFileId) throws HyracksDataException {
- synchronized(this) {
+ synchronized (this) {
if (isOpen) {
return;
}
memBTree.open(indexFileId);
BTree dummyBTree = diskBTreeFactory.createIndexInstance();
List<Object> validFileNames = fileManager.cleanupAndGetValidFiles(dummyBTree, componentFinalizer);
- for (Object o : validFileNames) {
+ for (Object o : validFileNames) {
String fileName = (String) o;
FileReference fileRef = new FileReference(new File(fileName));
BTree btree = createDiskBTree(diskBTreeFactory, fileRef, false);
@@ -143,12 +141,12 @@
@Override
public void close() throws HyracksDataException {
- synchronized(this) {
+ synchronized (this) {
if (!isOpen) {
return;
}
for (Object o : diskBTrees) {
- BTree btree = (BTree) o;
+ BTree btree = (BTree) o;
diskBufferCache.closeFile(btree.getFileId());
diskBufferCache.deleteFile(btree.getFileId(), false);
btree.close();
@@ -160,106 +158,17 @@
}
@Override
- public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ TreeIndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
- // TODO: This will become much simpler once the BTree supports a true upsert operation.
- try {
- ctx.memBTreeAccessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
- // Notice that a flush must wait for the current operation to
- // finish (threadRefCount must reach zero).
- // TODO: The methods below are very inefficient, we'd rather like
- // to flip the antimatter bit one single BTree traversal.
- if (ctx.getIndexOp() == IndexOp.DELETE) {
- return deleteExistingKey(tuple, ctx);
- } else {
- return insertOrUpdateExistingKey(tuple, ctx);
- }
- }
- return true;
- }
-
- private boolean deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- // We assume that tuple given by the user for deletion only contains the
- // key fields, but not any non-key fields.
- // Therefore, to set the delete bit in the tuple that already exist in
- // the BTree, we must retrieve the original tuple first. This is to
- // ensure that we have the proper value field.
- if (cmpFactories.length != memBTree.getFieldCount()) {
- ctx.reset(IndexOp.SEARCH);
- RangePredicate rangePredicate = new RangePredicate(tuple, tuple, true, true, ctx.cmp, ctx.cmp);
- ITreeIndexCursor cursor = ctx.memBTreeAccessor.createSearchCursor();
- ctx.memBTreeAccessor.search(cursor, rangePredicate);
- ITupleReference tupleCopy = null;
- try {
- if (cursor.hasNext()) {
- cursor.next();
- tupleCopy = TupleUtils.copyTuple(cursor.getTuple());
- }
- } finally {
- cursor.close();
- }
- // This means the tuple we are looking for must have been truly deleted by another thread.
- // Simply restart the original operation to insert the antimatter tuple.
- // There is a remote chance of livelocks due to this behavior.
- if (tupleCopy == null) {
- ctx.reset(IndexOp.DELETE);
- return false;
- }
- return memBTreeUpdate(tupleCopy, ctx);
- } else {
- // Since the existing tuple could be a matter tuple, we must delete it and re-insert.
- return memBTreeDeleteAndReinsert(tuple, ctx);
- }
- }
-
- private boolean insertOrUpdateExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
- TreeIndexException {
- // If all fields are keys, and the key we are trying to insert/update
- // already exists, then we are already done.
- // Otherwise, we must update the non-key fields.
- if (cmpFactories.length != memBTree.getFieldCount()) {
- return memBTreeUpdate(tuple, ctx);
- } else {
- // Since the existing tuple could be an antimatter tuple, we must delete it and re-insert.
- return memBTreeDeleteAndReinsert(tuple, ctx);
- }
- }
-
- private boolean memBTreeDeleteAndReinsert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
- TreeIndexException {
- // All fields are key fields, therefore a true BTree update is not
- // allowed.
- // In order to set/unset the antimatter bit, we
- // must truly delete the existing tuple from the BTree, and then
- // re-insert it (with the antimatter bit set/unset).
- // Since the tuple given by the user already has all fields, we
- // don't need to retrieve the already existing tuple.
- IndexOp originalOp = ctx.getIndexOp();
- try {
+
+ if(ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
ctx.memBTreeAccessor.delete(tuple);
- } catch (BTreeNonExistentKeyException e) {
- // Tuple has been deleted in the meantime. We will restart
- // our operation anyway.
+ return true;
}
- // Restart performOp to insert the tuple.
- ctx.reset(originalOp);
- return false;
- }
-
- private boolean memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
- TreeIndexException {
- IndexOp originalOp = ctx.getIndexOp();
- try {
- ctx.reset(IndexOp.UPDATE);
- ctx.memBTreeAccessor.update(tuple);
- } catch (BTreeNonExistentKeyException e) {
- // It is possible that the key has truly been deleted.
- // Simply restart the operation. There is a remote chance of
- // livelocks due to this behavior.
- ctx.reset(originalOp);
- return false;
- }
+
+ ctx.memBTreeAccessor.upsert(tuple);
+
return true;
}
@@ -289,36 +198,38 @@
public void addFlushedComponent(Object index) {
diskBTrees.addFirst(index);
}
-
+
@Override
public void resetInMemoryComponent() throws HyracksDataException {
memFreePageManager.reset();
memBTree.create(memBTree.getFileId());
}
-
+
private BTree createBulkLoadTarget() throws HyracksDataException {
String relFlushFileName = (String) fileManager.getRelFlushFileName();
FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
return createDiskBTree(bulkLoadBTreeFactory, fileRef, true);
}
-
+
private BTree createFlushTarget() throws HyracksDataException {
String relFlushFileName = (String) fileManager.getRelFlushFileName();
FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
return createDiskBTree(diskBTreeFactory, fileRef, true);
}
-
+
private BTree createMergeTarget(List<Object> mergingDiskBTrees) throws HyracksDataException {
BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
- String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+ String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+ .getFile().getName());
FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
return createDiskBTree(diskBTreeFactory, fileRef, true);
}
-
- private BTree createDiskBTree(BTreeFactory factory, FileReference fileRef, boolean createBTree) throws HyracksDataException {
+
+ private BTree createDiskBTree(BTreeFactory factory, FileReference fileRef, boolean createBTree)
+ throws HyracksDataException {
// File will be deleted during cleanup of merge().
diskBufferCache.createFile(fileRef);
int diskBTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
@@ -333,16 +244,17 @@
diskBTree.open(diskBTreeFileId);
return diskBTree;
}
-
- public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
+
+ public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+ boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
int numDiskBTrees = diskComponents.size();
- int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
- LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
- insertLeafFrameFactory, ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness);
+ int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
+ LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
+ ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness);
lsmTreeCursor.open(initialState, pred);
-
+
int cursorIx;
if (includeMemComponent) {
// Open cursor of in-memory BTree at index 0.
@@ -352,12 +264,12 @@
} else {
cursorIx = 0;
}
-
+
// Open cursors of on-disk BTrees.
ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
int diskBTreeIx = 0;
ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
- while(diskBTreesIter.hasNext()) {
+ while (diskBTreesIter.hasNext()) {
BTree diskBTree = (BTree) diskBTreesIter.next();
diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
@@ -366,8 +278,8 @@
}
lsmTreeCursor.initPriorityQueue();
}
-
- public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
+
+ public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = createOpContext();
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
@@ -376,13 +288,13 @@
// merged now, so we can clean them up after the merge has completed.
List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
mergedComponents.addAll(mergingDiskBTrees);
-
+
// Nothing to merge.
if (mergedComponents.size() <= 1) {
cursor.close();
return null;
}
-
+
// Bulk load the tuples from all on-disk BTrees into the new BTree.
BTree mergedBTree = createMergeTarget(mergingDiskBTrees);
IIndexBulkLoadContext bulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
@@ -395,16 +307,16 @@
} finally {
cursor.close();
}
- mergedBTree.endBulkLoad(bulkLoadCtx);
+ mergedBTree.endBulkLoad(bulkLoadCtx);
return mergedBTree;
}
-
+
@Override
public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
diskBTrees.removeAll(mergedComponents);
diskBTrees.addLast(newComponent);
}
-
+
@Override
public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
for (Object o : mergedComponents) {
@@ -416,7 +328,7 @@
fileRef.getFile().delete();
}
}
-
+
@Override
public InMemoryFreePageManager getInMemoryFreePageManager() {
return (InMemoryFreePageManager) memBTree.getFreePageManager();
@@ -426,11 +338,11 @@
public List<Object> getDiskComponents() {
return diskBTrees;
}
-
+
public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
private final BTree btree;
private IIndexBulkLoadContext bulkLoadCtx;
-
+
public LSMTreeBulkLoadContext(BTree btree) {
this.btree = btree;
}
@@ -438,20 +350,20 @@
public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
bulkLoadCtx = btree.beginBulkLoad(fillFactor);
}
-
+
public BTree getBTree() {
return btree;
}
-
+
public IIndexBulkLoadContext getBulkLoadCtx() {
return bulkLoadCtx;
}
}
-
+
@Override
public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
BTree diskBTree = createBulkLoadTarget();
- LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskBTree);
+ LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskBTree);
bulkLoadCtx.beginBulkLoad(fillFactor);
return bulkLoadCtx;
}
@@ -503,11 +415,11 @@
public int getFileId() {
return memBTree.getFileId();
}
-
+
public IBinaryComparatorFactory[] getComparatorFactories() {
return cmpFactories;
}
-
+
public LSMBTreeOpContext createOpContext() {
return new LSMBTreeOpContext(memBTree, insertLeafFrameFactory, deleteLeafFrameFactory);
}
@@ -516,7 +428,7 @@
public IIndexAccessor createAccessor() {
return new LSMBTreeIndexAccessor(lsmHarness, createOpContext());
}
-
+
public class LSMBTreeIndexAccessor extends LSMTreeIndexAccessor {
public LSMBTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
super(lsmHarness, ctx);
@@ -526,7 +438,7 @@
public IIndexCursor createSearchCursor() {
return new LSMBTreeRangeSearchCursor();
}
-
+
public MultiComparator getMultiComparator() {
LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
return concreteCtx.cmp;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index a688b5b..6f0f866 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
@@ -43,4 +44,12 @@
* @throws TreeIndexException
*/
public void merge() throws HyracksDataException, IndexException;
+
+ /**
+ * Deletes the tuple from the memory component only.
+ *
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
+ public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 28cea26..de2bdcd 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -73,4 +73,10 @@
public void merge() throws HyracksDataException, IndexException {
lsmHarness.merge();
}
+
+ @Override
+ public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.reset(IndexOp.PHYSICALDELETE);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
index 4f9280d..52ae64a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -44,20 +45,21 @@
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
- public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists, IBinaryComparatorFactory[] btreeComparatorFactories,
- IPrimitiveValueProviderFactory[] valueProviderFactories) {
- super(opDesc, ctx, partition, createIfNotExists);
+ public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists,
+ IBinaryComparatorFactory[] btreeComparatorFactories, IPrimitiveValueProviderFactory[] valueProviderFactories) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
memPageSize = DEFAULT_MEM_PAGE_SIZE;
memNumPages = DEFAULT_MEM_NUM_PAGES;
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
}
- public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists, int memPageSize, int memNumPages,
- IBinaryComparatorFactory[] btreeComparatorFactories, IPrimitiveValueProviderFactory[] valueProviderFactories) {
- super(opDesc, ctx, partition, createIfNotExists);
+ public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
+ int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
+ IPrimitiveValueProviderFactory[] valueProviderFactories) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.btreeComparatorFactories = btreeComparatorFactories;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 573a0f7..b0f9377 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -37,8 +38,8 @@
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, boolean createIfNotExists) {
- return new LSMRTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists, btreeComparatorFactories,
- valueProviderFactories);
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ return new LSMRTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists,
+ btreeComparatorFactories, valueProviderFactories);
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 803706c..a1e25ae 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -288,6 +288,10 @@
public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
TreeIndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+ if (ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
+ throw new UnsupportedOperationException("Physical delete not yet supported in LSM R-tree");
+ }
+
if (ctx.getIndexOp() == IndexOp.INSERT) {
// Before each insert, we must check whether there exist a killer
// tuple in the memBTree. If we find a killer tuple, we must truly
@@ -342,9 +346,8 @@
return true;
}
- public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
- IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
- throws HyracksDataException, IndexException {
+ public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+ boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
int numDiskTrees = diskComponents.size();
int numTrees = (includeMemComponent) ? numDiskTrees + 1 : numDiskTrees;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
index 99a349a..6f2b3e9 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -27,9 +28,10 @@
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
- public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- boolean createIfNotExists, IPrimitiveValueProviderFactory[] valueProviderFactories) {
- super(opDesc, ctx, partition, createIfNotExists);
+ public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists,
+ IPrimitiveValueProviderFactory[] valueProviderFactories) {
+ super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
this.valueProviderFactories = valueProviderFactories;
}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
index c76aa63..4871ce9 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -26,14 +27,15 @@
private static final long serialVersionUID = 1L;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
-
+
public RTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories) {
this.valueProviderFactories = valueProviderFactories;
}
-
+
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, boolean createIfNotExists) {
- return new RTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists, valueProviderFactories);
+ IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+ return new RTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists,
+ valueProviderFactories);
}
}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index 843eb34..d797ff2 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -39,15 +40,16 @@
IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory);
+ comparatorFactories, dataflowHelperFactory, opCallbackProvider);
this.keyFields = keyFields;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new RTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields);
+ return new RTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+ keyFields);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index c5a36c9..0dde9b6 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -30,8 +31,9 @@
protected MultiComparator cmp;
public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
- super(opDesc, ctx, partition, recordDescProvider);
+ IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+ int[] keyFields) {
+ super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider);
if (keyFields != null && keyFields.length > 0) {
searchKey = new PermutingFrameTupleReference();
searchKey.setFieldPermutation(keyFields);