merged from fullstack_asterix_stabilization to fullstack_lsm_staging -r3100:3171
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3173 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 2f167fb..f084614 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -311,5 +311,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>
+ hyracks-storage-am-lsm-invertedindex
+ </artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -93,7 +93,7 @@
protected static final String PRIMARY_INDEX = "primary";
protected final Configuration conf;
protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
return spec;
}
@@ -229,9 +230,9 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -356,8 +357,8 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -424,9 +425,10 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
+
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -455,8 +457,8 @@
JobSpecification spec = new JobSpecification();
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9de4c04..fe2fcac 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
- indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -222,18 +222,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -343,7 +343,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true);
ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -361,8 +361,8 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
@@ -377,7 +377,7 @@
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
@@ -446,18 +446,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 91c15b2..f1eceb7 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -335,7 +335,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -408,18 +408,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ee1fd0f..314c393 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -323,7 +323,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -384,18 +384,18 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 628e9ce..0c3db38 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -337,7 +337,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -417,18 +417,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 572bff9..3c00cad 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
public class JoinTest {
private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+ private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -193,8 +193,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -232,8 +232,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
- treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.addRoot(writer);
runTest(spec);
@@ -276,9 +277,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -351,8 +352,8 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
- keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+ true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
/** results (already in sorted order) */
@@ -360,8 +361,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -457,8 +458,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -554,7 +555,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -564,8 +565,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
index 99e55f1..c9f3fe7 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -23,12 +23,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -53,14 +53,16 @@
private final int outputArity;
public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, dataflowHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index 3938613..ff95e52 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -106,11 +107,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexHelper.init(false);
- btree = (BTree) treeIndexHelper.getIndex();
+ treeIndexHelper.open();
+ btree = (BTree) treeIndexHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,17 +121,17 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 60559e8..7662aa8 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -25,13 +25,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -61,14 +61,16 @@
private final int outputArity;
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -88,7 +90,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -96,8 +98,10 @@
boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -120,7 +124,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -128,8 +132,10 @@
boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 37029f3..61e4649 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,12 +105,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
- btree.open(treeIndexOpHelper.getIndexFileId());
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -140,15 +140,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -220,7 +220,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index ed177e3..d237761 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -24,13 +24,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -51,12 +51,13 @@
private boolean isSetUnion = false;
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -65,14 +66,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -84,14 +86,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isSetUnion) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index bd076d3..8b9bfc2 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -34,9 +34,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -86,11 +87,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
writer.open();
int lowKeySearchFields = btree.getComparatorFactories().length;
@@ -118,15 +119,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
setCursor();
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -196,7 +197,7 @@
throw new HyracksDataException(e);
}
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index f7b3d62..5ca5382 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -116,11 +117,11 @@
* function open
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -147,7 +148,7 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
dos = nullTupleBuilder.getDataOutput();
@@ -157,10 +158,10 @@
nullTupleBuilder.addFieldEndOffset();
}
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -178,7 +179,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -273,7 +274,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 9f1e1ad..d7c5d1f 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -97,10 +98,10 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -129,13 +130,13 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -151,7 +152,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -243,7 +244,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6af60a8..160324e 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,11 +105,11 @@
@Override
public void open() throws HyracksDataException {
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,11 +121,11 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -141,7 +142,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -214,7 +215,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index 615a25b..579935b 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -37,9 +37,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -90,11 +91,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -107,13 +108,13 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -129,7 +130,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
index 126fcb8..eb5ece6 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -22,12 +22,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -35,20 +35,21 @@
private final int[] fieldPermutation;
private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+ private final IIndexLifecycleManagerProvider lcManagerProvider;
private final IFileSplitProvider fileSplitProvider;
private final float fillFactor;
public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
- IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lcManagerProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 0, null, storageManager, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.fieldPermutation = fieldPermutation;
this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+ this.lcManagerProvider = lcManagerProvider;
this.fileSplitProvider = fileSplitProvider;
this.fillFactor = fillFactor;
}
@@ -57,6 +58,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
- recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+ recordDescProvider, storageManager, lcManagerProvider, fileSplitProvider);
}
}
\ No newline at end of file
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index 883fef4..5e089a5 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -20,111 +20,54 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
- private final TreeIndexDataflowHelper treeIndexOpHelper;
- private FrameTupleAccessor accessor;
- private IIndexBulkLoadContext bulkLoadCtx;
-
- private IRecordDescriptorProvider recordDescProvider;
- private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
- private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
- private final IFileSplitProvider fileSplitProvider;
- private final int partition;
private final float fillFactor;
- private IHyracksTaskContext ctx;
+ private final TreeIndexDataflowHelper treeIndexOpHelper;
+ private final IIndexOperatorDescriptor opDesc;
+ private final IRecordDescriptorProvider recordDescProvider;
+ private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
private ITreeIndex index;
+ private FrameTupleAccessor accessor;
+ private IIndexBulkLoader bulkLoader;
public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider) {
+ this.fillFactor = fillFactor;
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ this.opDesc = opDesc;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
-
- this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
- this.fileSplitProvider = fileSplitProvider;
- this.partition = partition;
- this.ctx = ctx;
- this.fillFactor = fillFactor;
}
@Override
public void open() throws HyracksDataException {
- initDrop();
- init();
- }
-
- private void initDrop() throws HyracksDataException {
- try {
- IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
- IBufferCache bufferCache = storageManager.getBufferCache(ctx);
- IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
-
- FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- int indexFileId = -1;
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(f);
- if (fileIsMapped)
- indexFileId = fileMapProvider.lookupFileId(f);
- }
-
- /**
- * delete the file if it is mapped
- */
- if (fileIsMapped) {
- // Unregister tree instance.
- synchronized (treeIndexRegistry) {
- treeIndexRegistry.unregister(indexFileId);
- }
-
- // remove name to id mapping
- bufferCache.deleteFile(indexFileId, false);
- }
- }
- // TODO: for the time being we don't throw,
- // with proper exception handling (no hanging job problem) we should
- // throw
- catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void init() throws HyracksDataException {
- AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
- .getOperatorDescriptor();
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
+ treeIndexOpHelper.create();
+ treeIndexOpHelper.open();
try {
- treeIndexOpHelper.init(true);
- treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
- index = (ITreeIndex) treeIndexOpHelper.getIndex();
- index.open(treeIndexOpHelper.getIndexFileId());
- bulkLoadCtx = index.beginBulkLoad(fillFactor);
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
+ bulkLoader = index.createBulkLoader(fillFactor, false, 0);
} catch (Exception e) {
// cleanup in case of failure
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -135,16 +78,22 @@
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
- index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+ try {
+ bulkLoader.add(tuple);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
}
}
@Override
public void close() throws HyracksDataException {
try {
- index.endBulkLoad(bulkLoadCtx);
+ bulkLoader.end();
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 567e220..8d6ab38 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -38,20 +38,25 @@
import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
import edu.uci.ics.pregelix.api.graph.Vertex;
public class RuntimeContext implements IWorkspaceFileFactory {
private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
- private IndexRegistry<IIndex> treeIndexRegistry;
- private IBufferCache bufferCache;
- private IFileMapManager fileMapManager;
- private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
- private IOManager ioManager;
- private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final IIndexLifecycleManager lcManager;
+ private final ILocalResourceRepository localResourceRepository;
+ private final ResourceIdFactory resourceIdFactory;
+ private final IBufferCache bufferCache;
+ private final IFileMapManager fileMapManager;
+ private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private final IOManager ioManager;
+ private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
@@ -64,8 +69,10 @@
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
- treeIndexRegistry = new IndexRegistry<IIndex>();
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+ lcManager = new IndexLifecycleManager();
+ localResourceRepository = new TransientLocalResourceRepository();
+ resourceIdFactory = new ResourceIdFactory(0);
}
public void close() {
@@ -80,6 +87,18 @@
System.gc();
}
+ public ILocalResourceRepository getLocalResourceRepository() {
+ return localResourceRepository;
+ }
+
+ public ResourceIdFactory getResourceIdFactory() {
+ return resourceIdFactory;
+ }
+
+ public IIndexLifecycleManager getIndexLifecycleManager() {
+ return lcManager;
+ }
+
public IBufferCache getBufferCache() {
return bufferCache;
}
@@ -88,10 +107,6 @@
return fileMapManager;
}
- public IndexRegistry<IIndex> getTreeIndexRegistry() {
- return treeIndexRegistry;
- }
-
public Map<StateKey, IStateObject> getAppStateStore() {
return appStateMap;
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
new file mode 100644
index 0000000..4fce6b3
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class IndexLifeCycleManagerProvider implements IIndexLifecycleManagerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IIndexLifecycleManagerProvider INSTANCE = new IndexLifeCycleManagerProvider();
+
+ private IndexLifeCycleManagerProvider() {
+ }
+
+ @Override
+ public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getIndexLifecycleManager();
+ }
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
index 57bbfbe..0cce59d 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -18,6 +18,8 @@
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
public class StorageManagerInterface implements IStorageManagerInterface {
@@ -37,4 +39,14 @@
public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
return RuntimeContext.get(ctx).getFileMapManager();
}
+
+ @Override
+ public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getLocalResourceRepository();
+ }
+
+ @Override
+ public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getResourceIdFactory();
+ }
}
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
deleted file mode 100644
index 7d66422..0000000
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
- private static final long serialVersionUID = 1L;
-
- public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
-
- private TreeIndexRegistryProvider() {
- }
-
- @Override
- public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
- return RuntimeContext.get(ctx).getTreeIndexRegistry();
- }
-}
\ No newline at end of file