Fix creation of callback factories
Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1613
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 12f28c5..ac3caf3 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.active;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +31,7 @@
public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
implements IActiveRuntime {
+ private final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
protected final IHyracksTaskContext ctx;
protected final ActiveManager activeManager;
/** A unique identifier for the runtime **/
@@ -79,6 +83,7 @@
@Override
public final void initialize() throws HyracksDataException {
+ LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable");
activeManager.registerRuntime(this);
try {
// notify cc that runtime has been registered
@@ -86,15 +91,18 @@
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
start();
} catch (InterruptedException e) {
+ LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e);
Thread.currentThread().interrupt();
throw new HyracksDataException(e);
} catch (Exception e) {
+ LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e);
throw new HyracksDataException(e);
} finally {
synchronized (this) {
done = true;
notifyAll();
}
+ LOGGER.log(Level.INFO, "initialize() returning on ActiveSourceOperatorNodePushable");
}
}
@@ -105,11 +113,13 @@
ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
} catch (Exception e) {
+ LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e);
throw new HyracksDataException(e);
+ } finally {
+ LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable");
}
}
-
@Override
public final IFrameWriter getInputFrameWriter(int index) {
return null;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 2ff0617..4b28d42 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -49,13 +50,13 @@
IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleFilterFactory tupleFilterFactory,
- IModificationOperationCallbackFactory modificationOpCallbackFactory, String indexName,
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory, String indexName,
IPageManagerFactory pageManagerFactory) {
super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory,
- pageManagerFactory);
+ searchCallbackFactory, pageManagerFactory);
this.indexName = indexName;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 39ea54d..f8f4a0e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -31,14 +31,12 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.IApplicationContextInfo;
import org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -89,15 +87,6 @@
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
-import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +135,6 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -157,7 +145,6 @@
public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
private final IStorageComponentProvider storaegComponentProvider;
- private final ITransactionSubsystemProvider txnSubsystemProvider;
private final IMetadataPageManagerFactory metadataPageManagerFactory;
private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
private final StorageProperties storageProperties;
@@ -182,7 +169,6 @@
this.storaegComponentProvider = componentProvider;
storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
- txnSubsystemProvider = componentProvider.getTransactionSubsystemProvider();
metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory();
primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory();
}
@@ -457,7 +443,6 @@
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
- boolean temp = dataset.getDatasetDetails().isTemp();
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
@@ -521,27 +506,13 @@
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
-
- ISearchOperationCallbackFactory searchCallbackFactory;
- if (isSecondary) {
- searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new SecondaryIndexSearchOperationCallbackFactory();
- } else {
- int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numPrimaryKeys];
- for (int i = 0; i < numPrimaryKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- /**
- * Due to the read-committed isolation level,
- * we may acquire very short duration lock(i.e., instant lock) for readers.
- */
- searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new PrimaryIndexInstantSearchOperationCallbackFactory(
- ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
- datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
}
+
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, theIndex, jobId, IndexOperation.SEARCH, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
RuntimeComponentsProvider rtcProvider = RuntimeComponentsProvider.RUNTIME_PROVIDER;
@@ -577,8 +548,6 @@
try {
ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
-
- boolean temp = dataset.getDatasetDetails().isTemp();
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
@@ -646,8 +615,13 @@
}
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- ISearchOperationCallbackFactory searchCallbackFactory =
- temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, IndexOperation.SEARCH, primaryKeyFields);
RTreeSearchOperatorDescriptor rtreeSearchOp;
IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this,
secondaryIndex, recType, metaType, compactionInfo.first, compactionInfo.second);
@@ -998,7 +972,6 @@
throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
}
- boolean temp = datasetDetails.isTemp();
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), fileIndexName);
@@ -1011,8 +984,8 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
dataset.getDatasetName(), fileIndexName, false);
- ISearchOperationCallbackFactory searchOpCallbackFactory =
- temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+ ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
+ .getSearchCallbackFactory(storaegComponentProvider, fileIndex, jobId, IndexOperation.SEARCH, null);
// Create the operator
ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
@@ -1085,20 +1058,16 @@
getSplitProviderAndConstraints(dataset);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[numKeys];
for (i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
}
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, Operation.UPSERT, ResourceType.LSM_BTREE)
- : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
- Operation.UPSERT, ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
- LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1281,18 +1250,14 @@
getSplitProviderAndConstraints(dataset);
// prepare callback
- int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[numKeys];
for (i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
}
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
- ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(
- ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset
+ .getSearchCallbackFactory(storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1310,7 +1275,7 @@
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true,
- indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ indexName, null, modificationCallbackFactory, searchCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1483,15 +1448,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_BTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_BTREE);
-
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchOpCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
@@ -1508,13 +1468,13 @@
op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
- indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
+ indexName, null, modificationCallbackFactory, searchOpCallbackFactory, prevFieldPermutation,
+ metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory,
- false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ false, indexName, null, modificationCallbackFactory, searchOpCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1641,15 +1601,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_RTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_RTREE);
-
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this,
@@ -1666,13 +1621,13 @@
op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false,
- indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
+ indexName, null, modificationCallbackFactory, searchCallbackFactory, prevFieldPermutation,
+ metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory,
- false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ false, indexName, null, modificationCallbackFactory, searchCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1851,14 +1806,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_INVERTED_INDEX)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_INVERTED_INDEX);
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this,
@@ -1875,14 +1826,16 @@
op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
- fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
+ fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory,
+ searchCallbackFactory, indexName,
prevFieldPermutation, metadataPageManagerFactory);
} else {
op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(), splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory,
+ indexName,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 82fe036..e0607a6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -68,6 +68,7 @@
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
@@ -519,9 +520,13 @@
int[] primaryKeyFields) throws AlgebricksException {
if (getDatasetDetails().isTemp()) {
return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
- primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
- index.resourceType())
+ ? index.isPrimaryIndex()
+ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+ Operation.get(op), index.resourceType())
+ : new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
+ primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+ Operation.get(op), index.resourceType())
: NoOpOperationCallbackFactory.INSTANCE;
} else if (index.isPrimaryIndex()) {
return op == IndexOperation.UPSERT
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
index f1547a8..02c1908 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
@@ -31,14 +31,14 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.common.IStorageManager;
-public class LSMInvertedIndexUpsertOperatorDescriptor
- extends LSMInvertedIndexInsertDeleteOperatorDescriptor {
+public class LSMInvertedIndexUpsertOperatorDescriptor extends LSMInvertedIndexInsertDeleteOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final int[] prevFieldPermutation;
@@ -50,18 +50,19 @@
IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
- String indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory) {
+ ISearchOperationCallbackFactory searchCallbackFactory, String indexName, int[] prevFieldPermutation,
+ IPageManagerFactory pageManagerFactory) {
super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory,
- modificationOpCallbackFactory, indexName, pageManagerFactory);
+ modificationOpCallbackFactory, searchCallbackFactory, indexName, pageManagerFactory);
this.prevFieldPermutation = prevFieldPermutation;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
- recordDescProvider, prevFieldPermutation);
+ return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation, recordDescProvider,
+ prevFieldPermutation);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index b37ecae..e6cfd2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -54,13 +54,13 @@
IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
- IModificationOperationCallbackFactory modificationOpCallbackProvider,
+ IModificationOperationCallbackFactory modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) {
super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
- modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
+ modificationOpCallbackFactory, searchOpCallbackProvider, pageManagerFactory);
this.prevValuePermutation = prevValuePermutation;
this.frameOpCallbackFactory = frameOpCallbackFactory;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index e9c0e5f..da3e986 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -18,20 +18,27 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-public enum SynchronousScheduler implements ILSMIOOperationScheduler {
- INSTANCE;
+public class SynchronousScheduler implements ILSMIOOperationScheduler {
+ private static final Logger LOGGER = Logger.getLogger(SynchronousScheduler.class.getName());
+ public static final SynchronousScheduler INSTANCE = new SynchronousScheduler();
+
+ private SynchronousScheduler() {
+ }
@Override
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
try {
operation.call();
- } catch (IndexException e) {
- throw new HyracksDataException(e);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "IO Operation failed", e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 46201d5..a342370 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -31,9 +31,9 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -55,11 +55,12 @@
int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory,
IPageManagerFactory pageManagerFactory) {
super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
dataflowHelperFactory, tupleFilterFactory, false, false,
- null, NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ null, NoOpLocalResourceFactoryProvider.INSTANCE, searchCallbackFactory,
modificationOpCallbackFactory,
pageManagerFactory);
this.fieldPermutation = fieldPermutation;