added proper callbacks to operators. passed all existing tests EXCEPT abort test.(The expected result of the existing abort test is different from the one under the entity-level commit)
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@852 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 9258bf0..577592d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -166,10 +166,11 @@
jobSpec, queryField, appContext.getStorageManagerInterface(), secondarySplitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListsComparatorFactories, new LSMInvertedIndexDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
- queryTokenizerFactory, searchModifierFactory, outputRecDesc, retainInput,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), queryTokenizerFactory,
+ searchModifierFactory, outputRecDesc, retainInput, NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
secondarySplitsAndConstraint.second);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 60b7e6c..dec56aa 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -68,6 +68,7 @@
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -526,9 +527,9 @@
builder.setTypeTraitProvider(format.getTypeTraitProvider());
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
- JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE);
- // set the job event listener
- spec.setJobletEventListenerFactory(new JobEventListenerFactory(asterixJobId, isWriteTransaction));
+ IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(asterixJobId, isWriteTransaction);
+ JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE, jobEventListenerFactory);
+
if (pc.isPrintJob()) {
switch (pdf) {
case HTML: {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index b34c52b..09c7458 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -125,32 +125,32 @@
IIndexDataflowHelperFactory dfhFactory;
switch (index.getIndexType()) {
case BTREE:
- dfhFactory = new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE);
+ dfhFactory = new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
break;
case RTREE:
dfhFactory = new LSMRTreeDataflowHelperFactory(
new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
- new IBinaryComparatorFactory[] { null }, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, null);
+ new IBinaryComparatorFactory[] { null }, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
break;
case NGRAM_INVIX:
case WORD_INVIX:
dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
break;
default:
throw new AsterixException("Unknown index type provided.");
}
IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
idxSplitsAndConstraint.first, dfhFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
idxSplitsAndConstraint.second);
@@ -166,10 +166,10 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE));
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -200,10 +200,10 @@
//TODO replace this transient one to the persistent one
TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER),
localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
@@ -277,11 +277,11 @@
LOGGER.info("LOAD into File Splits: " + sb.toString());
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER),
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 8b2d583..798bc34 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -46,10 +46,12 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = datasetDecls
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE));
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 3ecb365..e9c56db 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -31,11 +31,12 @@
//TODO replace this transient one to the persistent one
TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), localResourceFactoryProvider,
+ new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
@@ -68,9 +69,10 @@
// Create secondary BTree bulk load op.
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), BTree.DEFAULT_FILL_FACTOR);
+ new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 169297a..ab61bc4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -224,11 +224,14 @@
// +Infinity
int[] highKeyFields = null;
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, lowKeyFields,
- highKeyFields, true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
+ highKeyFields, true, true, new LSMBTreeDataflowHelperFactory(
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), false,
+ NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
@@ -283,7 +286,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
fieldPermutation, fillFactor, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 681059f..2ac2325 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -121,12 +121,13 @@
//TODO replace the transient one to persistent one
ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.INSTANCE, tokenTypeTraits, tokenComparatorFactories,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
- new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), localResourceFactoryProvider,
+ new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
secondaryPartitionConstraint);
@@ -216,12 +217,14 @@
fieldPermutation[i] = i;
}
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.INSTANCE, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.INSTANCE, tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
- new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), NoOpOperationCallbackFactory.INSTANCE);
+ spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ secondaryFileSplitProvider, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
+ new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER),
+ NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
secondaryPartitionConstraint);
return invIndexBulkLoadOp;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index f650899..f7be12f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -54,13 +54,14 @@
//TODO replace this transient one to the persistent one
TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
- secondaryComparatorFactories.length)), localResourceFactoryProvider,
+ primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ keyType, secondaryComparatorFactories.length)), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
@@ -139,10 +140,11 @@
spec,
numNestedSecondaryKeyFields,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
- secondaryComparatorFactories.length)), BTree.DEFAULT_FILL_FACTOR);
+ primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ keyType, secondaryComparatorFactories.length)), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
deleted file mode 100644
index bb2c313..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package edu.uci.ics.asterix.file;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-
-public class TestKeywordIndexJob {
-
- private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
- static {
- typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- }
-
- public static int DEFAULT_INPUT_DATA_COLUMN = 0;
- public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
-
- @SuppressWarnings("unchecked")
- public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
-
- JobSpecification spec = new JobSpecification();
-
- // ---------- START GENERAL BTREE STUFF
-
- // ---------- END GENERAL BTREE STUFF
-
- List<String> nodeGroup = new ArrayList<String>();
- nodeGroup.add("nc1");
- nodeGroup.add("nc2");
-
- // ---------- START KEY PROVIDER OP
-
- // TODO: should actually be empty tuple source
- // build tuple containing low and high search keys
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
- DataOutput dos = tb.getDataOutput();
-
- tb.reset();
- AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
- // field
- tb.addFieldEndOffset();
-
- ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
-
- // ---------- END KEY PROVIDER OP
-
- // ---------- START SECONRARY INDEX SCAN
-
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
- secondaryTypeTraits[0] = new ITypeTraits() {
-
- @Override
- public boolean isFixedLength() {
- return false;
- }
-
- @Override
- public int getFixedLength() {
- return -1;
- }
- };
-
- secondaryTypeTraits[1] = new ITypeTraits() {
-
- @Override
- public boolean isFixedLength() {
- return true;
- }
-
- @Override
- public int getFixedLength() {
- return 5;
- }
- };
-
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
- secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
- secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
- secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
- secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
-
- int[] lowKeyFields = null;
- int[] highKeyFields = null;
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
- // TODO: change file splits according to mount points in cluster config
- IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameInvIndex"))),
- new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameInvIndex"))) });
- BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories, lowKeyFields, highKeyFields,
- true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
- String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
- secondarySearchOpLocationConstraint);
-
- // ---------- END SECONDARY INDEX SCAN
-
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- String[] printerLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- printerLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
-
- // ---------- START CONNECT THE OPERATORS
-
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
-
- // ---------- END CONNECT THE OPERATORS
-
- spec.addRoot(printer);
-
- return spec;
- }
-
- public static void main(String[] args) throws Exception {
- String host;
- String appName;
- String ddlFile;
-
- switch (args.length) {
- case 0: {
- host = "127.0.0.1";
- appName = "asterix";
- ddlFile = "/home/abehm/workspace/asterix/src/test/resources/demo0927/local/create-index.aql";
- System.out.println("No arguments specified, using defauls:");
- System.out.println("HYRACKS HOST: " + host);
- System.out.println("APPNAME: " + appName);
- System.out.println("DDLFILE: " + ddlFile);
- }
- break;
-
- case 3: {
- host = args[0];
- appName = args[1];
- ddlFile = args[2];
- }
- break;
-
- default: {
- System.out.println("USAGE:");
- System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
- System.out.println("ARG 2: Application Name (e.g., asterix)");
- System.out.println("ARG 3: DDL File");
- host = null;
- appName = null;
- ddlFile = null;
- System.exit(0);
- }
- break;
- }
-
- int port = 1098;
- IHyracksClientConnection hcc = new HyracksConnection(host, port);
-
- TestKeywordIndexJob tij = new TestKeywordIndexJob();
- JobSpecification jobSpec = tij.createJobSpec();
-
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("asterix", jobSpec);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- }
-}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
deleted file mode 100644
index 29d5273..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package edu.uci.ics.asterix.file;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-
-public class TestSecondaryIndexJob {
-
- private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
- static {
- typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- }
-
- public static int DEFAULT_INPUT_DATA_COLUMN = 0;
- public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
-
- @SuppressWarnings("unchecked")
- public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
-
- JobSpecification spec = new JobSpecification();
-
- // ---------- START GENERAL BTREE STUFF
-
- // ---------- END GENERAL BTREE STUFF
-
- List<String> nodeGroup = new ArrayList<String>();
- nodeGroup.add("nc1");
- nodeGroup.add("nc2");
-
- // ---------- START KEY PROVIDER OP
-
- // TODO: should actually be empty tuple source
- // build tuple containing low and high search keys
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
- DataOutput dos = tb.getDataOutput();
-
- tb.reset();
- AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
- // field
- tb.addFieldEndOffset();
-
- ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
-
- // ---------- END KEY PROVIDER OP
-
- // ---------- START SECONRARY INDEX SCAN
-
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
- secondaryTypeTraits[0] = new ITypeTraits() {
-
- @Override
- public boolean isFixedLength() {
- return false;
- }
-
- @Override
- public int getFixedLength() {
- return -1;
- }
- };
-
- secondaryTypeTraits[1] = new ITypeTraits() {
-
- @Override
- public boolean isFixedLength() {
- return true;
- }
-
- @Override
- public int getFixedLength() {
- return 5;
- }
- };
-
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
- secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
- secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
- secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
- secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
-
- int[] lowKeyFields = null; // -infinity
- int[] highKeyFields = null; // +infinity
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
- // TODO: change file splits according to mount points in cluster config
- IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameBtreeIndex"))),
- new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameBtreeIndex"))) });
- BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories, lowKeyFields, highKeyFields,
- true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
- String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
- secondarySearchOpLocationConstraint);
-
- // ---------- END SECONDARY INDEX SCAN
-
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- String[] printerLocationConstraint = new String[nodeGroup.size()];
- for (int p = 0; p < nodeGroup.size(); p++) {
- printerLocationConstraint[p] = nodeGroup.get(p);
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
-
- // ---------- START CONNECT THE OPERATORS
-
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
-
- // ---------- END CONNECT THE OPERATORS
-
- spec.addRoot(printer);
-
- return spec;
- }
-
- public static void main(String[] args) throws Exception {
- String host;
- String appName;
- String ddlFile;
-
- switch (args.length) {
- case 0: {
- host = "127.0.0.1";
- appName = "asterix";
- ddlFile = "/home/nicnic/workspace/asterix/trunk/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
- System.out.println("No arguments specified, using defauls:");
- System.out.println("HYRACKS HOST: " + host);
- System.out.println("APPNAME: " + appName);
- System.out.println("DDLFILE: " + ddlFile);
- }
- break;
-
- case 3: {
- host = args[0];
- appName = args[1];
- ddlFile = args[2];
- }
- break;
-
- default: {
- System.out.println("USAGE:");
- System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
- System.out.println("ARG 2: Application Name (e.g., asterix)");
- System.out.println("ARG 3: DDL File");
- host = null;
- appName = null;
- ddlFile = null;
- System.exit(0);
- }
- break;
- }
-
- int port = 1098;
- IHyracksClientConnection hcc = new HyracksConnection(host, port);
-
- TestSecondaryIndexJob tij = new TestSecondaryIndexJob();
- JobSpecification jobSpec = tij.createJobSpec();
-
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("asterix", jobSpec);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- }
-}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
index dca790d..cef79bf 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
@@ -19,7 +19,7 @@
@Override
public IStorageManagerInterface getStorageManagerInterface() {
- return AsterixRuntimeComponentsProvider.INSTANCE;
+ return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
}
public static void setNodeControllerInfo(Map<String, Set<String>> nodeControllerInfo) {
@@ -32,7 +32,7 @@
@Override
public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
- return AsterixRuntimeComponentsProvider.INSTANCE;
+ return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
index a5684fc..ca45c89 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
@@ -1,10 +1,15 @@
package edu.uci.ics.asterix.common.context;
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -12,23 +17,34 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
-public enum AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerFactory {
- INSTANCE;
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+ ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMMergePolicyProvider,
+ ILSMOperationTrackerFactory {
+ private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
+
+ public static final AsterixRuntimeComponentsProvider LSMBTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ public static final AsterixRuntimeComponentsProvider LSMRTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE);
+ public static final AsterixRuntimeComponentsProvider LSMINVERTEDINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE);
+ public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null);
+
+ private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ this.ioOpCallbackFactory = ioOpCallbackFactory;
+ }
@Override
public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- // TODO: Replace this with the IndexOperationTracker, once the other transactional components are in place.
- return new ReferenceCountingOperationTracker(index);
+ return new IndexOperationTracker(index, ioOpCallbackFactory);
}
-
+
@Override
public ILSMFlushController getFlushController(IHyracksTaskContext ctx) {
return ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 26148bb..55a62cf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -130,6 +130,10 @@
.getTransactionalResourceRepository();
resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_BTREE, new IndexResourceManager(
ResourceType.LSM_BTREE, runtimeContext.getTransactionSubsystem()));
+ resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_RTREE, new IndexResourceManager(
+ ResourceType.LSM_RTREE, runtimeContext.getTransactionSubsystem()));
+ resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_INVERTED_INDEX,
+ new IndexResourceManager(ResourceType.LSM_INVERTED_INDEX, runtimeContext.getTransactionSubsystem()));
metadataNodeName = asterixProperties.getMetadataNodeName();
isNewUniverse = asterixProperties.isNewUniverse();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index fa428bb..5e56779 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -16,11 +16,13 @@
package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
@@ -46,6 +48,12 @@
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -70,6 +78,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -83,6 +92,7 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
@@ -319,12 +329,34 @@
} catch (Exception e) {
throw new AlgebricksException(e);
}
+
+ ISearchOperationCallbackFactory searchCallbackFactory = null;
+ if (isSecondary) {
+ searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+ } else {
+ JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ List<LogicalVariable> primaryKeyVars = new ArrayList<LogicalVariable>();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
+ primaryKeyVars.add(new LogicalVariable(outputVars.get(i).getId()));
+ }
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+ .variablesToBinaryHashFunctionFactories(primaryKeyVars, typeEnv, context);
+
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ entityIdFieldHashFunctionFactories, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ }
+
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE), retainInput, NoOpOperationCallbackFactory.INSTANCE);
+ new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
}
@@ -403,13 +435,17 @@
RecordDescriptor recDesc = new RecordDescriptor(recordFields);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+ ISearchOperationCallbackFactory searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
- RTreePolicyType.RTREE, primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
- comparatorFactories.length)), false, NoOpOperationCallbackFactory.INSTANCE);
+ RTreePolicyType.RTREE, primaryComparatorFactories,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ nestedKeyType.getTypeTag(), comparatorFactories.length)), false, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
}
@@ -515,20 +551,25 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+
+ //TODO
+ //figure out the right behavior of the bulkload and then give the right callback
+ //(ex. what's the expected behavior when there is an error during bulkload?)
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// Move key fields to front.
@@ -558,39 +599,55 @@
itemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+ //prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numKeys];
+ for (i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+ .variablesToBinaryHashFunctionFactories(keys, typeEnv, context);
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ PrimaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE);
+
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), null, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload, recordDesc,
- context, spec);
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+ recordDesc, context, spec);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload, recordDesc,
- context, spec);
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+ recordDesc, context, spec);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(IndexOperation indexOp,
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec) throws AlgebricksException {
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
+ IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
Dataset dataset = metadata.findDataset(datasetName);
@@ -601,12 +658,12 @@
AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
switch (secondaryIndex.getIndexType()) {
case BTREE: {
- return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
- filterFactory, recordDesc, context, spec, indexOp);
+ return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+ secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
}
case RTREE: {
- return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
- filterFactory, recordDesc, context, spec, indexOp);
+ return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+ secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
}
default: {
throw new AlgebricksException("Insert and delete not implemented for index type: "
@@ -621,8 +678,8 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
- primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
+ return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
}
@Override
@@ -631,8 +688,8 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
- primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
+ return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
}
private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
@@ -649,9 +706,10 @@
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
- String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
+ String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
int numKeys = primaryKeys.size() + secondaryKeys.size();
// generate field permutations
int[] fieldPermutation = new int[numKeys];
@@ -703,20 +761,40 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+ //prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[primaryKeys.size()];
+ i = 0;
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ primaryKeyFields[i] = idx;
+ i++;
+ }
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+ .variablesToBinaryHashFunctionFactories(primaryKeys, typeEnv, context);
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE);
+
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
- filterFactory, NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
- String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
+ String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
Dataset dataset = metadata.findDataset(datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = metadata.findType(itemTypeName);
@@ -768,15 +846,35 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+ //prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numKeys];
+ i = 0;
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ primaryKeyFields[i] = idx;
+ i++;
+ }
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+ .variablesToBinaryHashFunctionFactories(primaryKeys, typeEnv, context);
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_RTREE);
+
TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
- proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length)), filterFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ nestedKeyType.getTypeTag(), comparatorFactories.length)), filterFactory,
+ modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index 471a0b8..45e19c6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -22,6 +22,10 @@
this.jobId = jobId;
this.transactionalWrite = transactionalWrite;
}
+
+ public JobId getJobId() {
+ return jobId;
+ }
@Override
public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 8a38526..a51da07 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -22,6 +22,11 @@
public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
+
+ public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory();
+
+ private LSMBTreeIOOperationCallbackFactory() {
+ }
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index d8b796c..790c60c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -23,6 +23,11 @@
private static final long serialVersionUID = 1L;
+ public static LSMInvertedIndexIOOperationCallbackFactory INSTANCE = new LSMInvertedIndexIOOperationCallbackFactory();
+
+ private LSMInvertedIndexIOOperationCallbackFactory() {
+ }
+
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
return new LSMInvertedIndexIOOperationCallback((IndexOperationTracker) syncObj);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 2a8af33..4b47a95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -22,6 +22,11 @@
public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
+
+ public static LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory();
+
+ private LSMRTreeIOOperationCallbackFactory() {
+ }
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index 066a611..1f018b4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -20,6 +20,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.FieldsHashValueGenerator;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public abstract class AbstractOperationCallback {
@@ -29,12 +30,20 @@
protected final ILockManager lockManager;
protected final TransactionContext txnCtx;
protected int transactorLocalNumActiveOperations = 0;
-
- public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager) {
- this.datasetId = datasetId;
+
+ public AbstractOperationCallback(int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+ ILockManager lockManager) {
+ this.datasetId = new DatasetId(datasetId);
this.primaryKeyFields = primaryKeyFields;
- this.primaryKeyHashFunctions = primaryKeyHashFunctions;
+ if (primaryKeyHashFunctionFactories != null) {
+ this.primaryKeyHashFunctions = new IBinaryHashFunction[primaryKeyHashFunctionFactories.length];
+ for (int i = 0; i < primaryKeyHashFunctionFactories.length; ++i) {
+ this.primaryKeyHashFunctions[i] = primaryKeyHashFunctionFactories[i].createBinaryHashFunction();
+ }
+ } else {
+ this.primaryKeyHashFunctions = null;
+ }
this.txnCtx = txnCtx;
this.lockManager = lockManager;
}
@@ -43,19 +52,19 @@
IBinaryHashFunction[] primaryKeyHashFunctions) {
return FieldsHashValueGenerator.computeFieldsHashValue(tuple, primaryKeyFields, primaryKeyHashFunctions);
}
-
+
public TransactionContext getTransactionContext() {
return txnCtx;
}
-
+
public int getLocalNumActiveOperations() {
return transactorLocalNumActiveOperations;
}
-
+
public void incrementLocalNumActiveOperations() {
transactorLocalNumActiveOperations++;
}
-
+
public void decrementLocalNumActiveOperations() {
transactorLocalNumActiveOperations--;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
index 8d11d04..84c747b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
@@ -15,24 +15,28 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import java.io.Serializable;
+
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-public abstract class AbstractOperationCallbackFactory {
+public abstract class AbstractOperationCallbackFactory implements Serializable{
protected final JobId jobId;
- protected final DatasetId datasetId;
+ protected final int datasetId;
protected final int[] primaryKeyFields;
- protected final IBinaryHashFunction[] primaryKeyHashFunctions;
+ protected final IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories;
protected final ITransactionSubsystemProvider txnSubsystemProvider;
+ protected final byte resourceType;
- public AbstractOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider) {
+ public AbstractOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+ ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
this.jobId = jobId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
- this.primaryKeyHashFunctions = primaryKeyHashFunctions;
+ this.primaryKeyHashFunctionFactories = primaryKeyHashFunctionFactories;
this.txnSubsystemProvider = txnSubsystemProvider;
+ this.resourceType = resourceType;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
index a34e6dd..67bf68a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -16,13 +16,13 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
public class IndexOperationTracker implements ILSMOperationTracker {
@@ -38,12 +38,16 @@
this.index = index;
accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
+ if (ioOpCallbackFactory != null) {
+ ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
+ } else {
+ ioOpCallback = null;
+ }
}
@Override
- public synchronized boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation)
- throws HyracksDataException {
+ public synchronized boolean beforeOperation(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback, boolean tryOperation) throws HyracksDataException {
// Wait for pending flushes to complete.
// If flushFlag is set, then the flush is queued to occur by the last completing operation.
// This operation should wait for that flush to occur before proceeding.
@@ -60,28 +64,32 @@
numActiveOperations++;
// Increment transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(opCtx);
- opCallback.incrementLocalNumActiveOperations();
-
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.incrementLocalNumActiveOperations();
+ }
return true;
}
@Override
- public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ public void afterOperation(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- IndexOperation op = opCtx.getOperation();
- if (op == IndexOperation.SEARCH || op == IndexOperation.DISKORDERSCAN) {
- completeOperation(opCtx);
+ if (searchCallback != null) {
+ completeOperation(searchCallback, modificationCallback);
}
}
@Override
- public synchronized void completeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ public synchronized void completeOperation(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
numActiveOperations--;
// Decrement transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(opCtx);
- opCallback.decrementLocalNumActiveOperations();
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.decrementLocalNumActiveOperations();
+ }
// If we need a flush, and this is the last completing operation, then schedule the flush.
// Once the flush has completed notify all waiting operations.
@@ -90,11 +98,16 @@
}
}
- private AbstractOperationCallback getOperationCallback(ILSMIndexOperationContext opCtx) {
- if (opCtx.getSearchOperationCallback() != null) {
- return (AbstractOperationCallback) opCtx.getSearchOperationCallback();
+ private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) {
+
+ if (searchCallback == NoOpOperationCallback.INSTANCE || modificationCallback == NoOpOperationCallback.INSTANCE) {
+ return null;
+ }
+ if (searchCallback != null) {
+ return (AbstractOperationCallback) searchCallback;
} else {
- return (AbstractOperationCallback) opCtx.getModificationCallback();
+ return (AbstractOperationCallback) modificationCallback;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 3c9f092..d3c7eb0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -18,12 +18,11 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -38,14 +37,16 @@
IModificationOperationCallback {
protected final long resourceId;
+ protected final byte resourceType;
protected final IndexOperation indexOp;
protected final TransactionSubsystem txnSubsystem;
- public PrimaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
- TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
- super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
+ public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+ ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+ super(datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, lockManager);
this.resourceId = resourceId;
+ this.resourceType = resourceType;
this.indexOp = indexOp;
this.txnSubsystem = txnSubsystem;
}
@@ -62,11 +63,11 @@
@Override
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
- IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, ResourceType.LSM_BTREE);
+ IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
IndexOperation oldOp = IndexOperation.INSERT;
- if (lsmBTreeTuple.isAntimatter()) {
+ if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
oldOp = IndexOperation.DELETE;
}
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index a11d00e..be972a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -16,13 +16,16 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -38,24 +41,31 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
- IndexOperation indexOp) {
- super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
+ public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, IHyracksTaskContext ctx)
- throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- ILSMIndex index = (ILSMIndex) txnSubsystem.getTransactionalResourceRepository().getTransactionalResource(
- resourceId);
+ TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+ ILSMIndex index = (ILSMIndex) txnResourceRepository.getTransactionalResource(resourceId);
+
+ //register the resource if it is not registered
+ if (index == null) {
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId, resource);
+ index = (ILSMIndex) resource;
+ }
+
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
- primaryKeyFields, primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
- resourceId, indexOp);
+ primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, txnSubsystem.getLockManager(),
+ txnSubsystem, resourceId, resourceType, indexOp);
txnCtx.registerIndexAndCallback(index, (AbstractOperationCallback) modCallback);
return modCallback;
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index d055126..85e8980 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -17,10 +17,9 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -30,9 +29,10 @@
*/
public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
- public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
- IBinaryHashFunction[] entityIdFieldHashFunctions, ILockManager lockManager, TransactionContext txnCtx) {
- super(datasetId, entityIdFields, entityIdFieldHashFunctions, txnCtx, lockManager);
+ public PrimaryIndexSearchOperationCallback(int datasetId, int[] entityIdFields,
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories, ILockManager lockManager,
+ TransactionContext txnCtx) {
+ super(datasetId, entityIdFields, entityIdFieldHashFunctionFactories, txnCtx, lockManager);
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index a124196..3e6dfcb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -16,13 +16,12 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -32,9 +31,10 @@
private static final long serialVersionUID = 1L;
- public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] entityIdFields,
- IBinaryHashFunction[] entityIdFieldHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider) {
- super(jobId, datasetId, entityIdFields, entityIdFieldHashFunctions, txnSubsystemProvider);
+ public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields,
+ IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories,
+ ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
+ super(jobId, datasetId, entityIdFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, resourceType);
}
@Override
@@ -43,8 +43,8 @@
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
- return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
- txnSubsystem.getLockManager(), txnCtx);
+ return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields,
+ primaryKeyHashFunctionFactories, txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index d2fa889..8c1aac0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -18,11 +18,10 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -37,15 +36,17 @@
IModificationOperationCallback {
protected final long resourceId;
+ protected final byte resourceType;
protected final IndexOperation indexOp;
protected final IndexOperation oldOp;
protected final TransactionSubsystem txnSubsystem;
- public SecondaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
- TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
- super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
+ public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+ ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+ super(datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, lockManager);
this.resourceId = resourceId;
+ this.resourceType = resourceType;
this.indexOp = indexOp;
oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
this.txnSubsystem = txnSubsystem;
@@ -58,7 +59,7 @@
@Override
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
- IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, ResourceType.LSM_BTREE);
+ IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
try {
logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 51ea0bf..3fede95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -16,17 +16,18 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
IModificationOperationCallbackFactory {
@@ -34,21 +35,31 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
- IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
- IndexOperation indexOp) {
- super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
+ public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+ IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource, IHyracksTaskContext ctx)
throws HyracksDataException {
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+ TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+ ILSMIndex index = (ILSMIndex) txnResourceRepository.getTransactionalResource(resourceId);
+
+ //register the resource if it is not registered
+ if (index == null) {
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId, resource);
+ index = (ILSMIndex) resource;
+ }
+
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
return new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields,
- primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, indexOp);
+ primaryKeyHashFunctionFactories, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+ resourceType, indexOp);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 67c9ba0..16aefee 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -26,7 +26,7 @@
ISearchOperationCallback {
public SecondaryIndexSearchOperationCallback() {
- super(null, null, null, null, null);
+ super(-1, null, null, null, null);
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index 5997bf1..fbaa12f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -45,9 +45,6 @@
if (resourceRepository.get(resourceId) == null) {
MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
resourceRepository.put(newMutableResourceId, resource);
-
- // wake up threads waiting for the resource
- resourceRepository.notifyAll();
}
}
}
@@ -56,9 +53,6 @@
synchronized (resourceMgrRepository) {
if (resourceMgrRepository.get(id) == null) {
resourceMgrRepository.put(id, resourceMgr);
-
- // wake up threads waiting for the resource manager
- resourceMgrRepository.notifyAll();
}
}
}
@@ -66,15 +60,6 @@
public Object getTransactionalResource(long resourceId) {
synchronized (resourceRepository) {
mutableResourceId.setId(resourceId);
- while (resourceRepository.get(mutableResourceId) == null) {
- try {
- resourceRepository.wait();
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- break; // the thread might be interrupted due to other
- // failures occurring elsewhere, break from the loop
- }
- }
return resourceRepository.get(mutableResourceId);
}
}
@@ -83,7 +68,5 @@
synchronized (resourceMgrRepository) {
return resourceMgrRepository.get(id);
}
-
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
index 85d349b..1eeadff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -28,8 +28,6 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
-
-
public class IndexLogger implements ILogger, ICloseable {
private final Map<Object, Object> jobId2ReusableLogContentObjectRepositoryMap = new ConcurrentHashMap<Object, Object>();
@@ -107,9 +105,15 @@
reusableLogContentObject.setOldValue(oldValue);
}
- int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/+ tupleWriter
- .bytesRequired(newValue);
- logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/+ tupleWriter.bytesRequired(oldValue);
+ int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/;
+ if (newValue != null) {
+ logContentSize += tupleWriter.bytesRequired(newValue);
+ }
+
+ logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/;
+ if (oldValue != null) {
+ logContentSize += tupleWriter.bytesRequired(oldValue);
+ }
txnSubsystem.getLogManager().log(LogType.UPDATE, context, datasetId, PKHashValue, resourceId, resourceType,
logContentSize, reusableLogContentObject, this, reusableLogContentObject.getLogicalLogLocator());
@@ -132,14 +136,18 @@
offset += 1;
//new tuple size
- tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+ if (reusableLogContentObject.getNewValue() != null) {
+ tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+ }
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
offset += 4;
//new tuple
- tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
- logicalLogLocator.getMemoryOffset() + offset);
- offset += tupleSize;
+ if (tupleSize != 0) {
+ tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
+ logicalLogLocator.getMemoryOffset() + offset);
+ offset += tupleSize;
+ }
if (resourceType == ResourceType.LSM_BTREE) {
//old operation
@@ -149,13 +157,19 @@
if (reusableLogContentObject.getOldOperation() != IndexOperation.NOOP) {
//old tuple size
- tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+ if (reusableLogContentObject.getOldValue() != null) {
+ tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+ } else {
+ tupleSize = 0;
+ }
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
offset += 4;
- //old tuple
- tupleWriter.writeTuple(reusableLogContentObject.getNewValue(),
- logicalLogLocator.getBuffer().getArray(), logicalLogLocator.getMemoryOffset() + offset);
+ if (tupleSize != 0) {
+ //old tuple
+ tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer()
+ .getArray(), logicalLogLocator.getMemoryOffset() + offset);
+ }
}
}
}
@@ -169,7 +183,7 @@
public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
throws ACIDException {
}
-
+
/**
* Represents a utility class for generating log records corresponding to
* operations on a ITreeIndex implementation. A TreeLogger instance is thread
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 77a7b1b..9a097d6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,14 +27,13 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.FileUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogRecordHelper;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecordHelper;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
@@ -54,7 +53,7 @@
public class RecoveryManager implements IRecoveryManager {
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
- private TransactionSubsystem transactionProvider;
+ private TransactionSubsystem txnSubsystem;
/**
* A file at a known location that contains the LSN of the last log record
@@ -66,7 +65,7 @@
private Map<Long, List<PhysicalLogLocator>> dirtyPagesTable;
public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
- this.transactionProvider = TransactionProvider;
+ this.txnSubsystem = TransactionProvider;
try {
FileUtil.createFileIfNotExists(checkpoint_record_file);
} catch (IOException ioe) {
@@ -79,14 +78,14 @@
}
private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
- return new PhysicalLogLocator(0, transactionProvider.getLogManager());
+ return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
}
/**
* TODO:This method is currently not implemented completely.
*/
public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException {
- ILogManager logManager = transactionProvider.getLogManager();
+ ILogManager logManager = txnSubsystem.getLogManager();
state = SystemState.RECOVERING;
transactionTable = new HashMap<Long, TransactionTableEntry>();
dirtyPagesTable = new HashMap<Long, List<PhysicalLogLocator>>();
@@ -110,13 +109,16 @@
break;
}
byte resourceMgrId = parser.getResourceMgrId(memLSN);
- IResourceManager resourceMgr = transactionProvider.getTransactionalResourceRepository()
+ IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
.getTransactionalResourceMgr(resourceMgrId);
+ //register resourceMgr if it is not registered.
if (resourceMgr == null) {
- throw new ACIDException("unknown resource mgr with id " + resourceMgrId);
- } else {
- resourceMgr.redo(parser, memLSN);
+ resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+ resourceMgrId, resourceMgr);
}
+ resourceMgr.redo(parser, memLSN);
+
writeCheckpointRecord(memLSN.getLsn());
}
state = SystemState.HEALTHY;
@@ -162,7 +164,7 @@
*/
@Override
public void rollbackTransaction(TransactionContext txnContext) throws ACIDException {
- ILogManager logManager = transactionProvider.getLogManager();
+ ILogManager logManager = txnSubsystem.getLogManager();
ILogRecordHelper parser = logManager.getLogRecordHelper();
// Obtain the last log record written by the transaction
@@ -209,16 +211,21 @@
}
// look up the repository to get the resource manager
- IResourceManager resourceMgr = transactionProvider.getTransactionalResourceRepository()
+ IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
.getTransactionalResourceMgr(resourceMgrId);
+
+ // register resourceMgr if it is not registered.
if (resourceMgr == null) {
- throw new ACIDException(txnContext, " unknown resource manager " + resourceMgrId);
- } else {
- resourceMgr.undo(parser, logLocator);
+ resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+ resourceMgrId, resourceMgr);
}
+ resourceMgr.undo(parser, logLocator);
break;
+
case LogType.COMMIT:
throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+
default:
throw new ACIDException("Unsupported LogType: " + logType);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
index ca5a71e..d4f15d9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
@@ -27,6 +27,9 @@
int fh = fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx));
h = h * 31 + fh;
+ if (h < 0) {
+ h = h*(-1);
+ }
}
return h;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
index b5eb7f8..857d8ae 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
@@ -15,8 +15,10 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import java.io.Serializable;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public interface ITransactionSubsystemProvider {
+public interface ITransactionSubsystemProvider extends Serializable{
public TransactionSubsystem getTransactionSubsystem(IHyracksTaskContext ctx);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index ebcd506..bbade35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -27,6 +27,8 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
/**
@@ -93,6 +95,16 @@
}
}
+ public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException {
+ synchronized (indexes) {
+ for (int i = 0; i < indexes.size(); i++) {
+ ILSMIndex index = indexes.get(i);
+ IModificationOperationCallback modificationCallback = (IModificationOperationCallback) callbacks.get(i);
+ ((IndexOperationTracker) index.getOperationTracker()).completeOperation(null, modificationCallback);
+ }
+ }
+ }
+
public void setTransactionType(TransactionType transactionType) {
this.transactionType = transactionType;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index b93425c..a22a7a2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -21,6 +21,7 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* An implementation of the @see ITransactionManager interface that provides
@@ -96,6 +97,12 @@
//for entity-level commit
if (PKHashVal != -1) {
transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+ try {
+ //decrease the transaction reference count on index
+ txnContext.decreaseActiveTransactionCountOnIndexes();
+ } catch (HyracksDataException e) {
+ throw new ACIDException("failed to complete index operation", e);
+ }
return;
}