Implemented the IntroduceInstantLockSearchCallbackRule which is statically going to inspect the unnest_map and data_source_scan operators in the plan. If there is only a single access to the primary index, then an instant-lock search callback will be passed to the LSM-BTree at runtime. Otherwise, the usual search callback (the one that hold locks until the job commit) will be passed to the LSM-BTree. All test cases pass.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1476 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index f0880ec..efe01a5 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -47,9 +47,11 @@
private final List<LogicalVariable> highKeyVarList;
private final boolean isPrimaryIndex;
private final boolean isEqCondition;
+ private Object implConfig;
public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
- boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
+ boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList,
+ List<LogicalVariable> highKeyVarList) {
super(idx, requiresBroadcast);
this.isPrimaryIndex = isPrimaryIndex;
this.isEqCondition = isEqCondition;
@@ -57,6 +59,14 @@
this.highKeyVarList = highKeyVarList;
}
+ public void setImplConfig(Object implConfig) {
+ this.implConfig = implConfig;
+ }
+
+ public Object getImplConfig() {
+ return implConfig;
+ }
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.BTREE_SEARCH;
@@ -89,16 +99,16 @@
VariableUtilities.getLiveVariables(unnestMap, outputVars);
}
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
- builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
- dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
- jobGenParams.isHighKeyInclusive());
+ builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(), dataset,
+ jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
+ jobGenParams.isHighKeyInclusive(), implConfig);
builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
}
-
+
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
if (requiresBroadcast) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 053028a..68d6a81 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -30,6 +30,7 @@
import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastRule;
import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
@@ -229,6 +230,7 @@
physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+ physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule());
physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule());
physicalRewritesAllLevels.add(new IntroHashPartitionMergeExchange());
physicalRewritesAllLevels.add(new SetClosedRecordConstructorsRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
new file mode 100644
index 0000000..62cca6c
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.algebra.operators.CommitOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.BTreeSearchPOperator;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataImplConfig;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceInstantLockSearchCallbackRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ private void extractDataSourcesInfo(AbstractLogicalOperator op,
+ Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap) {
+
+ for (int i = 0; i < op.getInputs().size(); ++i) {
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+ if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+ UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
+ ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+ FunctionIdentifier fid = f.getFunctionIdentifier();
+ if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+ throw new IllegalStateException();
+ }
+ AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+ jobGenParams.readFromFuncArgs(f.getArguments());
+ boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
+ String indexName = jobGenParams.getIndexName();
+ if (isPrimaryIndex) {
+ if (dataSourcesMap.containsKey(indexName)) {
+ ++(dataSourcesMap.get(indexName).first);
+ } else {
+ dataSourcesMap.put(indexName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+ LogicalOperatorTag.UNNEST_MAP, unnestMapOp.getPhysicalOperator()));
+ }
+ }
+ }
+ } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+ DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
+ String datasetName = ((AqlDataSource) dataSourceScanOp.getDataSource()).getDataset().getDatasetName();
+ if (dataSourcesMap.containsKey(datasetName)) {
+ ++(dataSourcesMap.get(datasetName).first);
+ } else {
+ dataSourcesMap.put(datasetName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+ LogicalOperatorTag.DATASOURCESCAN, dataSourceScanOp.getPhysicalOperator()));
+ }
+ }
+ extractDataSourcesInfo(descendantOp, dataSourcesMap);
+ }
+
+ }
+
+ private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
+ if (op.getPhysicalOperator() == null) {
+ return false;
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.EXTENSION_OPERATOR) {
+ ExtensionOperator extensionOp = (ExtensionOperator) op;
+ if (extensionOp.getDelegate() instanceof CommitOperator) {
+ return true;
+ }
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT
+ || op.getOperatorTag() == LogicalOperatorTag.WRITE_RESULT) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+ if (!checkIfRuleIsApplicable(op)) {
+ return false;
+ }
+ Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap = new HashMap<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>();
+ extractDataSourcesInfo(op, dataSourcesMap);
+
+ boolean introducedInstantLock = false;
+
+ Iterator<Map.Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>> it = dataSourcesMap
+ .entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> entry = it.next();
+ Triple<Integer, LogicalOperatorTag, IPhysicalOperator> triple = entry.getValue();
+ if (triple.first == 1) {
+ AqlMetadataImplConfig aqlMetadataImplConfig = new AqlMetadataImplConfig(true);
+ if (triple.second == LogicalOperatorTag.UNNEST_MAP) {
+ BTreeSearchPOperator pOperator = (BTreeSearchPOperator) triple.third;
+ pOperator.setImplConfig(aqlMetadataImplConfig);
+ introducedInstantLock = true;
+ } else {
+ DataSourceScanPOperator pOperator = (DataSourceScanPOperator) triple.third;
+ pOperator.setImplConfig(aqlMetadataImplConfig);
+ introducedInstantLock = true;
+ }
+ }
+
+ }
+ return introducedInstantLock;
+ }
+}
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index e3a9e91..84e152a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -59,7 +59,6 @@
retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(5));
isPrimaryIndex = datasetName.equals(indexName);
- isPrimaryIndex = datasetName.equals(indexName);
}
public String getIndexName() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java
new file mode 100644
index 0000000..4ea752b
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.declared;
+
+public class AqlMetadataImplConfig {
+ private final boolean useInstantLock;
+
+ public AqlMetadataImplConfig(boolean useInstantLock) {
+ this.useInstantLock = useInstantLock;
+ }
+
+ public boolean isInstantLock() {
+ return useInstantLock;
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 938dfc4..e1f707c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,6 +68,7 @@
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
@@ -247,7 +248,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed, IOperatorSchema opSchema,
- IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+ IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException {
Dataset dataset;
try {
@@ -264,12 +265,12 @@
return buildExternalDatasetScan(jobSpec, dataset, dataSource);
} else {
return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
- context);
+ context, implConfig);
}
case INTERNAL: {
return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
- context);
+ context, implConfig);
}
case EXTERNAL: {
return buildExternalDatasetScan(jobSpec, dataset, dataSource);
@@ -285,14 +286,14 @@
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context) throws AlgebricksException,
- MetadataException {
+ Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context, Object implConfig)
+ throws AlgebricksException, MetadataException {
AqlSourceId asid = dataSource.getId();
String dataverseName = asid.getDataverseName();
String datasetName = asid.getDatasetName();
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, context, false, dataset,
- primaryIndex.getIndexName(), null, null, true, true);
+ primaryIndex.getIndexName(), null, null, true, true, implConfig);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
@@ -467,7 +468,8 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
+ int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
+ throws AlgebricksException {
boolean isSecondary = true;
try {
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -523,11 +525,16 @@
primaryKeyFields[i] = i;
}
+ AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
+ searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ } else {
+ searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ }
}
-
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,