[ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification
- user model changes: no
- storage format changes: no
- interface changes: IJobEventListenerFactory
details:
- Remove the TxnId from the compiled job specification
- This enables one job spec to be used by multiple jobs/transactions
- Runtime operators who need the TxnId will pull it from the EventListener
Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2154
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index abd18aa..09092ff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -21,7 +21,6 @@
import java.util.List;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,12 +42,10 @@
public class CommitPOperator extends AbstractPhysicalOperator {
private final List<LogicalVariable> primaryKeyLogicalVars;
- private final TxnId txnId;
private final Dataset dataset;
private final boolean isSink;
- public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
- this.txnId = txnId;
+ public CommitPOperator(Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
this.dataset = dataset;
this.primaryKeyLogicalVars = primaryKeyLogicalVars;
this.isSink = isSink;
@@ -87,7 +84,7 @@
int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
//get dataset splits
- IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields,
+ IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, primaryKeyFields,
isSink);
builder.contributeMicroOperator(op, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c941320..c3cc0ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -30,7 +30,6 @@
import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -158,7 +157,6 @@
jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory,
retainInput, retainMissing, context.getMissingWriterFactory(),
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
- ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(),
IndexOperation.SEARCH, null),
minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
propagateIndexFilter);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 61339bf..7dfe161 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -23,9 +23,7 @@
import org.apache.asterix.algebra.operators.CommitOperator;
import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -99,14 +97,10 @@
primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
}
- //get TxnId(TransactorId)
- MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
- TxnId txnId = mp.getTxnId();
-
//create the logical and physical operator
CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
CommitPOperator commitPOperator =
- new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink);
+ new CommitPOperator(dataset, primaryKeyLogicalVars, isSink);
commitOperator.setPhysicalOperator(commitPOperator);
//create ExtensionOperator and put the commitOperator in it.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index e42b5e5..9dddda4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -279,7 +279,7 @@
Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
- List<TxnId> txnIds = new ArrayList<>();
+ Map<Integer, TxnId> txnIdMap = new HashMap<>();
FeedMetaOperatorDescriptor metaOp;
for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
@@ -415,11 +415,16 @@
for (OperatorDescriptorId root : subJob.getRoots()) {
jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
}
- txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId());
+ int datasetId = metadataProvider
+ .findDataset(curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName())
+ .getDatasetId();
+ TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
+ txnIdMap.put(datasetId, txnId);
}
// jobEventListenerFactory
- jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true));
+ jobSpec.setJobletEventListenerFactory(
+ new MultiTransactionJobletEventListenerFactory(txnIdMap, true));
// useConnectorSchedulingPolicy
jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
// connectorAssignmentPolicy
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 42f577f..a04c994 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -283,13 +283,13 @@
IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider);
// Creates primary index scan op.
- IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId);
+ IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source);
// Creates secondary BTree upsert op.
IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
// The final commit operator.
- IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target);
+ IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, target);
// Connects empty-tuple-source and scan.
spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0);
@@ -326,11 +326,11 @@
// Creates the commit operator for populating the target dataset.
private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
- TxnId txnId, Dataset target) throws AlgebricksException {
+ Dataset target) throws AlgebricksException {
int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
new IPushRuntimeFactory[] {
- target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) },
+ target.getCommitRuntimeFactory(metadataProvider, primaryKeyFields, true) },
new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 352a5f8..a1c2ee6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -183,7 +183,7 @@
mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
- new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
+ new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
ResourceType.LSM_BTREE);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -614,9 +614,9 @@
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType,
mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators);
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
- storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
index 0f37b13..acb3ae8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -22,9 +22,9 @@
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
/**
- * an interface for JobEventListenerFactories to add Asterix transaction JobId getter
+ * an interface for JobEventListenerFactories to add Asterix txnId getter
*/
public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
- TxnId getTxnId(TxnId compiledTxnId);
+ TxnId getTxnId(int datasetId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index d2b1276..ce6671b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -26,15 +26,13 @@
public abstract class AbstractOperationCallbackFactory implements Serializable {
private static final long serialVersionUID = 1L;
- protected final TxnId txnId;
protected final int datasetId;
protected final int[] primaryKeyFields;
protected final ITransactionSubsystemProvider txnSubsystemProvider;
protected final byte resourceType;
- public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+ public AbstractOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
- this.txnId = txnId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.txnSubsystemProvider = txnSubsystemProvider;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 305cdfa..1e0d597 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -84,7 +84,6 @@
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
@@ -446,7 +445,7 @@
}
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
- storageComponentProvider, theIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+ storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields);
IStorageManager storageManager = getStorageComponentProvider().getStorageManager();
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
BTreeSearchOperatorDescriptor btreeSearchOp;
@@ -485,7 +484,7 @@
}
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
- storageComponentProvider, secondaryIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+ storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields);
RTreeSearchOperatorDescriptor rtreeSearchOp;
IIndexDataflowHelperFactory indexDataflowHelperFactory =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first);
@@ -789,7 +788,7 @@
// files index
RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
- .getSearchCallbackFactory(storageComponentProvider, fileIndex, txnId, IndexOperation.SEARCH, null);
+ .getSearchCallbackFactory(storageComponentProvider, fileIndex, IndexOperation.SEARCH, null);
// Create the operator
ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory,
@@ -959,7 +958,7 @@
primaryKeyFields[i] = i;
}
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, primaryIndex, txnId, indexOp, primaryKeyFields);
+ storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
IIndexDataflowHelperFactory idfh =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
IOperatorDescriptor op;
@@ -1081,9 +1080,8 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
- TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+ storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
IOperatorDescriptor op;
@@ -1179,9 +1177,8 @@
getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
- TxnId planTxnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, secondaryIndex, planTxnId, indexOp, modificationCallbackPrimaryKeyFields);
+ storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory indexDataflowHelperFactory =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
IOperatorDescriptor op;
@@ -1289,9 +1286,8 @@
getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
- TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+ storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
IOperatorDescriptor op;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e6c0de8..2386d77 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,7 +42,6 @@
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -541,8 +540,6 @@
*
* @param index
* the index
- * @param txnId
- * the job id being compiled
* @param op
* the operation this search is part of
* @param primaryKeyFields
@@ -553,21 +550,21 @@
* if the callback factory could not be created
*/
public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
- Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
+ Index index, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
if (index.isPrimaryIndex()) {
/*
* Due to the read-committed isolation level,
* we may acquire very short duration lock(i.e., instant lock) for readers.
*/
- return (op == IndexOperation.UPSERT) ?
- new LockThenSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
- storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) :
- new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+ return (op == IndexOperation.UPSERT)
+ ? new LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
+ storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE)
+ : new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
} else if (index.getKeyFieldNames().isEmpty()) {
// this is the case where the index is secondary primary index and locking is required
// since the secondary primary index replaces the dataset index (which locks)
- return new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+ return new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
}
return new SecondaryIndexSearchOperationCallbackFactory();
@@ -578,8 +575,6 @@
*
* @param index
* the index
- * @param txnId
- * the job id of the job being compiled
* @param op
* the operation performed for this callback
* @param primaryKeyFields
@@ -590,24 +585,23 @@
* If the callback factory could not be created
*/
public IModificationOperationCallbackFactory getModificationCallbackFactory(
- IStorageComponentProvider componentProvider, Index index, TxnId txnId, IndexOperation op,
+ IStorageComponentProvider componentProvider, Index index, IndexOperation op,
int[] primaryKeyFields) throws AlgebricksException {
if (index.isPrimaryIndex()) {
- return op == IndexOperation.UPSERT ?
- new UpsertOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+ return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
- index.resourceType()) :
- op == IndexOperation.DELETE || op == IndexOperation.INSERT ?
- new PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(),
+ index.resourceType())
+ : op == IndexOperation.DELETE || op == IndexOperation.INSERT
+ ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(),
primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
- Operation.get(op), index.resourceType()) :
- NoOpOperationCallbackFactory.INSTANCE;
+ Operation.get(op), index.resourceType())
+ : NoOpOperationCallbackFactory.INSTANCE;
} else {
- return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ?
- new SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+ return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
+ ? new SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
- index.resourceType()) :
- NoOpOperationCallbackFactory.INSTANCE;
+ index.resourceType())
+ : NoOpOperationCallbackFactory.INSTANCE;
}
}
@@ -651,8 +645,6 @@
*
* @param metadataProvider,
* the metadata provider.
- * @param txnId,
- * the AsterixDB job id for transaction management.
* @param primaryKeyFieldPermutation,
* the primary key field permutation according to the input.
* @param isSink,
@@ -660,10 +652,10 @@
* @return the commit runtime factory for inserting/upserting/deleting operations on this dataset.
* @throws AlgebricksException
*/
- public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, TxnId txnId,
+ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
int[] datasetPartitions = getDatasetPartitions(metadataProvider);
- return new CommitRuntimeFactory(txnId, datasetId, primaryKeyFieldPermutation,
+ return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
metadataProvider.isWriteTransaction(), datasetPartitions, isSink);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 5973c06..3d05c0e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -57,7 +56,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -335,13 +333,11 @@
* the metadata provider.
* @param dataset,
* the dataset to scan.
- * @param txnId,
- * the AsterixDB job id for transaction management.
* @return a primary index scan operator.
* @throws AlgebricksException
*/
public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
- Dataset dataset, TxnId txnId) throws AlgebricksException {
+ Dataset dataset) throws AlgebricksException {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
@@ -351,8 +347,8 @@
// +Infinity
int[] highKeyFields = null;
ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
- ISearchOperationCallbackFactory searchCallbackFactory =
- new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+ ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+ dataset.getDatasetId(),
dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
@@ -396,7 +392,6 @@
metadataProvider.getSplitProviderAndConstraints(dataset);
// prepare callback
- TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null);
int[] primaryKeyFields = new int[numKeys];
for (int i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
@@ -405,9 +400,9 @@
metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
- storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+ storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
- storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+ storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
IIndexDataflowHelperFactory idfh =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
LSMPrimaryUpsertOperatorDescriptor op;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index e6a24e3..2ebfe78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.metadata.utils;
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import java.util.EnumSet;
import java.util.List;
@@ -161,13 +161,12 @@
* the metadata provider.
* @return the AsterixDB job id for transaction management.
*/
- public static TxnId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
+ public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
TxnId txnId = TxnIdFactory.create();
metadataProvider.setTxnId(txnId);
boolean isWriteTransaction = metadataProvider.isWriteTransaction();
IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction);
spec.setJobletEventListenerFactory(jobEventListenerFactory);
- return txnId;
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 8f70f21..41def96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -129,11 +128,10 @@
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset,
metadataProvider);
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create primary index scan op.
- IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
- txnId);
+ IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
// Assign op.
IOperatorDescriptor sourceOp = primaryScanOp;
@@ -199,7 +197,6 @@
* ====== ========= ........ ........
*/
@Override
- @SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
int numSecondaryKeys = index.getKeyFieldNames().size();
secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 89bd4b1..7791cad 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
@@ -72,14 +71,14 @@
// only handle internal datasets
// Create dummy key provider for feeding the primary index scan.
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
- getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+ getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
// Assign op.
IOperatorDescriptor sourceOp = primaryScanOp;
@@ -124,7 +123,6 @@
}
@Override
- @SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
int numSecondaryKeys = index.getKeyFieldNames().size();
secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 93cc11d..b91d65f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -21,7 +21,6 @@
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -79,7 +78,6 @@
}
@Override
- @SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
int numSecondaryKeys = index.getKeyFieldNames().size();
IndexType indexType = index.getIndexType();
@@ -206,14 +204,14 @@
@Override
public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
- getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+ getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
IOperatorDescriptor sourceOp = primaryScanOp;
boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index 1333493..bf5178c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -52,7 +51,6 @@
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-@SuppressWarnings("rawtypes")
public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper {
protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -184,11 +182,11 @@
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
- getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+ getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
// Assign op.
IOperatorDescriptor sourceOp = primaryScanOp;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index 2a4a952..0a772fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -102,7 +101,6 @@
}
protected RecordDescriptor getTaggedRecordDescriptor(RecordDescriptor recDescriptor) {
- @SuppressWarnings("rawtypes")
ISerializerDeserializer[] fields =
new ISerializerDeserializer[recDescriptor.getFields().length + NUM_TAG_FIELDS];
ITypeTraits[] traits = null;
@@ -273,10 +271,10 @@
}
protected IOperatorDescriptor createPrimaryIndexScanDiskComponentsOp(JobSpecification spec,
- MetadataProvider metadataProvider, RecordDescriptor outRecDesc, TxnId txnId) throws AlgebricksException {
+ MetadataProvider metadataProvider, RecordDescriptor outRecDesc) throws AlgebricksException {
ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
- ISearchOperationCallbackFactory searchCallbackFactory =
- new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+ ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+ dataset.getDatasetId(),
dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 3626f16..1c9eb74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -82,7 +81,6 @@
}
@Override
- @SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
int numSecondaryKeys = index.getKeyFieldNames().size();
IndexType indexType = index.getIndexType();
@@ -208,14 +206,14 @@
@Override
public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
// Create primary index scan op.
IOperatorDescriptor primaryScanOp =
- DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, txnId);
+ DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
IOperatorDescriptor sourceOp = primaryScanOp;
boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 613df21..8e6e0e9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -58,7 +57,6 @@
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-@SuppressWarnings("rawtypes")
public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperationsHelper {
protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -201,11 +199,10 @@
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
// Create dummy key provider for feeding the primary index scan.
IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
- TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+ IndexUtil.bindJobEventListener(spec, metadataProvider);
// Create primary index scan op.
- IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
- txnId);
+ IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
// Assign op.
IOperatorDescriptor sourceOp = primaryScanOp;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index d3c3fe7..0de61ff 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -48,12 +48,8 @@
this.transactionalWrite = transactionalWrite;
}
- public TxnId getTxnId() {
- return txnId;
- }
-
@Override
- public TxnId getTxnId(TxnId compiledTxnId) {
+ public TxnId getTxnId(int datasetId) {
return txnId;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index bfe1925..656ea09 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.runtime.job.listener;
-import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.api.IJobEventListenerFactory;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -40,23 +40,22 @@
public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory {
private static final long serialVersionUID = 1L;
- private final List<TxnId> txnIds;
+ private final Map<Integer, TxnId> txnIdMap;
private final boolean transactionalWrite;
- public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, boolean transactionalWrite) {
- this.txnIds = txnIds;
+ public MultiTransactionJobletEventListenerFactory(Map<Integer, TxnId> txnIdMap, boolean transactionalWrite) {
+ this.txnIdMap = txnIdMap;
this.transactionalWrite = transactionalWrite;
}
- //TODO: Enable this factory to be usable for Deployed Jobs
@Override
- public TxnId getTxnId(TxnId compiledTxnId) {
- return compiledTxnId;
+ public TxnId getTxnId(int datasetId) {
+ return txnIdMap.get(datasetId);
}
@Override
public IJobletEventListenerFactory copyFactory() {
- return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite);
+ return new MultiTransactionJobletEventListenerFactory(txnIdMap, transactionalWrite);
}
@Override
@@ -74,13 +73,13 @@
ITransactionManager txnManager =
((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
.getTransactionSubsystem().getTransactionManager();
- for (TxnId txnId : txnIds) {
- ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
+ for (TxnId subTxnId : txnIdMap.values()) {
+ ITransactionContext txnContext = txnManager.getTransactionContext(subTxnId);
txnContext.setWriteTxn(transactionalWrite);
if (jobStatus != JobStatus.FAILURE) {
- txnManager.commitTransaction(txnId);
+ txnManager.commitTransaction(subTxnId);
} else {
- txnManager.abortTransaction(txnId);
+ txnManager.abortTransaction(subTxnId);
}
}
} catch (ACIDException e) {
@@ -93,9 +92,10 @@
try {
TransactionOptions options =
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
- for (TxnId txnId : txnIds) {
+ for (TxnId subTxnId : txnIdMap.values()) {
((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
- .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options);
+ .getTransactionSubsystem().getTransactionManager()
+ .beginTransaction(subTxnId, options);
}
} catch (ACIDException e) {
throw new Error(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 9f96263..1346b76 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,7 +25,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,9 +36,9 @@
private static final long serialVersionUID = 1L;
- public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+ public LockThenSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
- super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+ super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
}
@Override
@@ -49,7 +48,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
return new LockThenSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
txnSubsystem, txnCtx, operatorNodePushable);
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index f9c8e3c..d4242bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
private static final long serialVersionUID = 1L;
- public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+ public PrimaryIndexInstantSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
- super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+ super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
}
@Override
@@ -51,7 +50,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), resourceId,
primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 8f5e386..97fd7ce 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -49,9 +48,9 @@
private static final long serialVersionUID = 1L;
private final Operation indexOp;
- public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+ public PrimaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
- super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+ super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@@ -69,7 +68,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 64cbbc9..72e48bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
private static final long serialVersionUID = 1L;
- public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+ public PrimaryIndexSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
- super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+ super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
}
@Override
@@ -51,7 +50,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3fc42c9..0c20ee9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -45,9 +44,9 @@
private static final long serialVersionUID = 1L;
private final Operation indexOp;
- public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+ public SecondaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
- super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+ super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@@ -65,7 +64,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index da4aab8..c2f512f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,9 +43,9 @@
private static final long serialVersionUID = 1L;
protected final Operation indexOp;
- public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+ public UpsertOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
- super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+ super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@@ -65,7 +64,7 @@
try {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
- .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+ .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
aResource.getPartition(), resourceType, indexOp);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 91db197..445ad4a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -20,7 +20,6 @@
package org.apache.asterix.transaction.management.runtime;
import org.apache.asterix.common.api.IJobEventListenerFactory;
-import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,16 +30,14 @@
private static final long serialVersionUID = 1L;
- protected final TxnId txnId;
protected final int datasetId;
protected final int[] primaryKeyFields;
protected final boolean isWriteTransaction;
protected int[] datasetPartitions;
protected final boolean isSink;
- public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
+ public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
int[] datasetPartitions, boolean isSink) {
- this.txnId = txnId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.isWriteTransaction = isWriteTransaction;
@@ -56,7 +53,8 @@
@Override
public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
- return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, primaryKeyFields,
- isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+ return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId,
+ primaryKeyFields, isWriteTransaction,
+ datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
}
}