Merged fullstack_lsm_staging upto r3336
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-core/pom.xml b/fullstack/pregelix/pregelix-core/pom.xml
index 576758b..7b247a8 100644
--- a/fullstack/pregelix/pregelix-core/pom.xml
+++ b/fullstack/pregelix/pregelix-core/pom.xml
@@ -18,8 +18,9 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
+ <version>2.3.2</version>
<executions>
<execution>
<id>balancer</id>
@@ -72,7 +73,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -164,7 +165,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3a4c41b..72256f9 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -147,22 +147,11 @@
start = System.currentTimeMillis();
runHDFSWRite(jobGen);
runCleanup(jobGen);
- destroyApplication(applicationName);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
LOG.info("job finished");
} catch (Exception e) {
- try {
- /**
- * destroy application if there is any exception
- */
- if (hcc != null) {
- destroyApplication(applicationName);
- }
- } catch (Exception e2) {
- throw new HyracksException(e2);
- }
throw new HyracksException(e);
}
}
@@ -220,8 +209,8 @@
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(applicationName, job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
@@ -236,15 +225,11 @@
LOG.info("jar packing finished " + (end - start) + "ms");
start = System.currentTimeMillis();
- hcc.createApplication(applicationName, appZip);
+ // TODO: Fix this step to use Yarn
+ //hcc.createApplication(applicationName, appZip);
end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
}
-
- public void destroyApplication(String appName) throws Exception {
- hcc.destroyApplication(appName);
- }
-
}
class FileFilter implements FilenameFilter {
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -93,7 +93,7 @@
protected static final String PRIMARY_INDEX = "primary";
protected final Configuration conf;
protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
return spec;
}
@@ -229,9 +230,9 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -356,8 +357,8 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -424,9 +425,10 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
+
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -455,8 +457,8 @@
JobSpecification spec = new JobSpecification();
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9de4c04..fe2fcac 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
- indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -222,18 +222,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -343,7 +343,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true);
ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -361,8 +361,8 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
@@ -377,7 +377,7 @@
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
@@ -446,18 +446,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 91c15b2..f1eceb7 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -335,7 +335,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -408,18 +408,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ee1fd0f..314c393 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -323,7 +323,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -384,18 +384,18 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 628e9ce..0c3db38 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -337,7 +337,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -417,18 +417,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index cd2a864..d099645 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.core.util;
-import java.io.File;
import java.util.EnumSet;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -27,6 +26,7 @@
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
public class PregelixHyracksIntegrationUtil {
@@ -45,7 +45,7 @@
private static NodeControllerService nc2;
private static IHyracksClientConnection hcc;
- public static void init(String topologyFilePath) throws Exception {
+ public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = CC_HOST;
ccConfig.clusterNetIpAddress = CC_HOST;
@@ -54,7 +54,6 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 0;
ccConfig.profileDumpPeriod = -1;
- ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -68,6 +67,7 @@
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
+ ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -78,6 +78,7 @@
ncConfig2.dataIPAddress = "127.0.0.1";
ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
+ ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -86,14 +87,6 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void destroyApp(String hyracksAppName) throws Exception {
- hcc.destroyApplication(hyracksAppName);
- }
-
- public static void createApp(String hyracksAppName) throws Exception {
- hcc.createApplication(hyracksAppName, null);
- }
-
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
@@ -102,7 +95,7 @@
public static void runJob(JobSpecification spec, String appName) throws Exception {
spec.setFrameSize(FRAME_SIZE);
- JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
hcc.waitForCompletion(jobId);
}
diff --git a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index f7cadf6..3c00cad 100644
--- a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
public class JoinTest {
private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+ private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -102,8 +102,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.init();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -121,7 +120,6 @@
runIndexRightOuterJoin();
TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
}
@@ -195,8 +193,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -234,8 +232,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
- treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.addRoot(writer);
runTest(spec);
@@ -278,9 +277,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -353,8 +352,8 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
- keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+ true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
/** results (already in sorted order) */
@@ -362,8 +361,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -459,8 +458,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -556,7 +555,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -566,8 +565,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);