merge r3038:3100 fullstack_asterix_stabilization -> fullstack_lsm_staging
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3106 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 5238068..4c63452 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -310,5 +310,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>
+ hyracks-storage-am-lsm-invertedindex
+ </artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -93,7 +93,7 @@
protected static final String PRIMARY_INDEX = "primary";
protected final Configuration conf;
protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
return spec;
}
@@ -229,9 +230,9 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -356,8 +357,8 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -424,9 +425,10 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
+
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -455,8 +457,8 @@
JobSpecification spec = new JobSpecification();
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index b1730d3..419e4b0 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
- indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -222,18 +222,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -345,7 +345,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true);
ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -363,8 +363,8 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
@@ -379,7 +379,7 @@
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
@@ -448,18 +448,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 87f2156..27394e6 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -337,7 +337,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -410,18 +410,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 6ea258e..91a629c 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -325,7 +325,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -383,24 +383,23 @@
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
-
+
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink3);
@@ -425,13 +424,11 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index c6dabb0..e653196 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -339,7 +339,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -419,18 +419,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 97659d4..a5cb6e2 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
public class JoinTest {
private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+ private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -195,8 +195,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -234,8 +234,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
- treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.addRoot(writer);
runTest(spec);
@@ -278,9 +279,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -353,8 +354,8 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
- keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+ true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
/** results (already in sorted order) */
@@ -362,8 +363,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -459,8 +460,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -556,7 +557,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -566,8 +567,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
index 99e55f1..c9f3fe7 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -23,12 +23,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -53,14 +53,16 @@
private final int outputArity;
public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, dataflowHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index 3938613..ff95e52 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -106,11 +107,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexHelper.init(false);
- btree = (BTree) treeIndexHelper.getIndex();
+ treeIndexHelper.open();
+ btree = (BTree) treeIndexHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,17 +121,17 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 60559e8..7662aa8 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -25,13 +25,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -61,14 +61,16 @@
private final int outputArity;
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -88,7 +90,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -96,8 +98,10 @@
boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -120,7 +124,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -128,8 +132,10 @@
boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 37029f3..61e4649 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,12 +105,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
- btree.open(treeIndexOpHelper.getIndexFileId());
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -140,15 +140,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -220,7 +220,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index ed177e3..d237761 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -24,13 +24,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -51,12 +51,13 @@
private boolean isSetUnion = false;
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -65,14 +66,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -84,14 +86,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isSetUnion) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index bd076d3..8b9bfc2 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -34,9 +34,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -86,11 +87,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
writer.open();
int lowKeySearchFields = btree.getComparatorFactories().length;
@@ -118,15 +119,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
setCursor();
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -196,7 +197,7 @@
throw new HyracksDataException(e);
}
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index f7b3d62..5ca5382 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -116,11 +117,11 @@
* function open
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -147,7 +148,7 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
dos = nullTupleBuilder.getDataOutput();
@@ -157,10 +158,10 @@
nullTupleBuilder.addFieldEndOffset();
}
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -178,7 +179,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -273,7 +274,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 9f1e1ad..d7c5d1f 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -97,10 +98,10 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -129,13 +130,13 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -151,7 +152,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -243,7 +244,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6af60a8..160324e 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,11 +105,11 @@
@Override
public void open() throws HyracksDataException {
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,11 +121,11 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -141,7 +142,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -214,7 +215,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index 615a25b..579935b 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -37,9 +37,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -90,11 +91,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -107,13 +108,13 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -129,7 +130,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
index 126fcb8..eb5ece6 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -22,12 +22,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -35,20 +35,21 @@
private final int[] fieldPermutation;
private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+ private final IIndexLifecycleManagerProvider lcManagerProvider;
private final IFileSplitProvider fileSplitProvider;
private final float fillFactor;
public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
- IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lcManagerProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 0, null, storageManager, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.fieldPermutation = fieldPermutation;
this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+ this.lcManagerProvider = lcManagerProvider;
this.fileSplitProvider = fileSplitProvider;
this.fillFactor = fillFactor;
}
@@ -57,6 +58,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
- recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+ recordDescProvider, storageManager, lcManagerProvider, fileSplitProvider);
}
}
\ No newline at end of file
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index 883fef4..5e089a5 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -20,111 +20,54 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
- private final TreeIndexDataflowHelper treeIndexOpHelper;
- private FrameTupleAccessor accessor;
- private IIndexBulkLoadContext bulkLoadCtx;
-
- private IRecordDescriptorProvider recordDescProvider;
- private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
- private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
- private final IFileSplitProvider fileSplitProvider;
- private final int partition;
private final float fillFactor;
- private IHyracksTaskContext ctx;
+ private final TreeIndexDataflowHelper treeIndexOpHelper;
+ private final IIndexOperatorDescriptor opDesc;
+ private final IRecordDescriptorProvider recordDescProvider;
+ private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
private ITreeIndex index;
+ private FrameTupleAccessor accessor;
+ private IIndexBulkLoader bulkLoader;
public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider) {
+ this.fillFactor = fillFactor;
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ this.opDesc = opDesc;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
-
- this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
- this.fileSplitProvider = fileSplitProvider;
- this.partition = partition;
- this.ctx = ctx;
- this.fillFactor = fillFactor;
}
@Override
public void open() throws HyracksDataException {
- initDrop();
- init();
- }
-
- private void initDrop() throws HyracksDataException {
- try {
- IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
- IBufferCache bufferCache = storageManager.getBufferCache(ctx);
- IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
-
- FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- int indexFileId = -1;
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(f);
- if (fileIsMapped)
- indexFileId = fileMapProvider.lookupFileId(f);
- }
-
- /**
- * delete the file if it is mapped
- */
- if (fileIsMapped) {
- // Unregister tree instance.
- synchronized (treeIndexRegistry) {
- treeIndexRegistry.unregister(indexFileId);
- }
-
- // remove name to id mapping
- bufferCache.deleteFile(indexFileId, false);
- }
- }
- // TODO: for the time being we don't throw,
- // with proper exception handling (no hanging job problem) we should
- // throw
- catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void init() throws HyracksDataException {
- AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
- .getOperatorDescriptor();
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
+ treeIndexOpHelper.create();
+ treeIndexOpHelper.open();
try {
- treeIndexOpHelper.init(true);
- treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
- index = (ITreeIndex) treeIndexOpHelper.getIndex();
- index.open(treeIndexOpHelper.getIndexFileId());
- bulkLoadCtx = index.beginBulkLoad(fillFactor);
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
+ bulkLoader = index.createBulkLoader(fillFactor, false, 0);
} catch (Exception e) {
// cleanup in case of failure
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -135,16 +78,22 @@
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
- index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+ try {
+ bulkLoader.add(tuple);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
}
}
@Override
public void close() throws HyracksDataException {
try {
- index.endBulkLoad(bulkLoadCtx);
+ bulkLoader.end();
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 567e220..8d6ab38 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -38,20 +38,25 @@
import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
import edu.uci.ics.pregelix.api.graph.Vertex;
public class RuntimeContext implements IWorkspaceFileFactory {
private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
- private IndexRegistry<IIndex> treeIndexRegistry;
- private IBufferCache bufferCache;
- private IFileMapManager fileMapManager;
- private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
- private IOManager ioManager;
- private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final IIndexLifecycleManager lcManager;
+ private final ILocalResourceRepository localResourceRepository;
+ private final ResourceIdFactory resourceIdFactory;
+ private final IBufferCache bufferCache;
+ private final IFileMapManager fileMapManager;
+ private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private final IOManager ioManager;
+ private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
@@ -64,8 +69,10 @@
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
- treeIndexRegistry = new IndexRegistry<IIndex>();
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+ lcManager = new IndexLifecycleManager();
+ localResourceRepository = new TransientLocalResourceRepository();
+ resourceIdFactory = new ResourceIdFactory(0);
}
public void close() {
@@ -80,6 +87,18 @@
System.gc();
}
+ public ILocalResourceRepository getLocalResourceRepository() {
+ return localResourceRepository;
+ }
+
+ public ResourceIdFactory getResourceIdFactory() {
+ return resourceIdFactory;
+ }
+
+ public IIndexLifecycleManager getIndexLifecycleManager() {
+ return lcManager;
+ }
+
public IBufferCache getBufferCache() {
return bufferCache;
}
@@ -88,10 +107,6 @@
return fileMapManager;
}
- public IndexRegistry<IIndex> getTreeIndexRegistry() {
- return treeIndexRegistry;
- }
-
public Map<StateKey, IStateObject> getAppStateStore() {
return appStateMap;
}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 79a5c3c..4b4cc2e 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,176 +41,168 @@
@SuppressWarnings("deprecation")
public class RunJobTestSuite extends TestSuite {
- private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
- .getName());
+ private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String FILE_EXTENSION_OF_RESULTS = "result";
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+ private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+ private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/webmap_link.txt";
- private static final String HDFS_PATH = "/webmap/";
+ private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+ private static final String HDFS_PATH = "/webmap/";
- private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
- private static final String HDFS_PATH2 = "/webmapcomplex/";
+ private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+ private static final String HDFS_PATH2 = "/webmapcomplex/";
- private static final String DATA_PATH3 = "data/clique/clique.txt";
- private static final String HDFS_PATH3 = "/clique/";
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
- private static final String HYRACKS_APP_NAME = "pregelix";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
- + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String HYRACKS_APP_NAME = "pregelix";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init();
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH2);
- dest = new Path(HDFS_PATH2);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH2);
+ dest = new Path(HDFS_PATH2);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH3);
- dest = new Path(HDFS_PATH3);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- DataOutputStream confOutput = new DataOutputStream(
- new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
- public static Test suite() throws Exception {
- List<String> ignores = getFileList(PATH_TO_IGNORE);
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- RunJobTestSuite testSuite = new RunJobTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
+ public static Test suite() throws Exception {
+ List<String> ignores = getFileList(PATH_TO_IGNORE);
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ RunJobTestSuite testSuite = new RunJobTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
- for (File qFile : queries) {
- if (isInList(ignores, qFile.getName()))
- continue;
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+ for (File qFile : queries) {
+ if (isInList(ignores, qFile.getName()))
+ continue;
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- String resultFileName = ACTUAL_RESULT_DIR + File.separator
- + jobExtToResExt(qFile.getName());
- String expectedFileName = EXPECTED_RESULT_DIR
- + File.separator + jobExtToResExt(qFile.getName());
- testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
- qFile.getName(),
- qFile.getAbsolutePath().toString(), resultFileName,
- expectedFileName));
- }
- }
- }
- return testSuite;
- }
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+ String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+ testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
+ .toString(), resultFileName, expectedFileName));
+ }
+ }
+ }
+ return testSuite;
+ }
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 2; i == 2; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
- protected static List<String> getFileList(String ignorePath)
- throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
- }
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+ }
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
new file mode 100644
index 0000000..4fce6b3
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class IndexLifeCycleManagerProvider implements IIndexLifecycleManagerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IIndexLifecycleManagerProvider INSTANCE = new IndexLifeCycleManagerProvider();
+
+ private IndexLifeCycleManagerProvider() {
+ }
+
+ @Override
+ public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getIndexLifecycleManager();
+ }
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
index 57bbfbe..0cce59d 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -18,6 +18,8 @@
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
public class StorageManagerInterface implements IStorageManagerInterface {
@@ -37,4 +39,14 @@
public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
return RuntimeContext.get(ctx).getFileMapManager();
}
+
+ @Override
+ public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getLocalResourceRepository();
+ }
+
+ @Override
+ public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getResourceIdFactory();
+ }
}
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
deleted file mode 100644
index 7d66422..0000000
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
- private static final long serialVersionUID = 1L;
-
- public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
-
- private TreeIndexRegistryProvider() {
- }
-
- @Override
- public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
- return RuntimeContext.get(ctx).getTreeIndexRegistry();
- }
-}
\ No newline at end of file