Implemented the IntroduceInstantLockSearchCallbackRule which is statically going to inspect the unnest_map and data_source_scan operators in the plan. If there is only a single access to the primary index, then an instant-lock search callback will be passed to the LSM-BTree at runtime. Otherwise, the usual search callback (the one that hold locks until the job commit) will be passed to the LSM-BTree. All test cases pass.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1476 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index f0880ec..efe01a5 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -47,9 +47,11 @@
     private final List<LogicalVariable> highKeyVarList;
     private final boolean isPrimaryIndex;
     private final boolean isEqCondition;
+    private Object implConfig;
 
     public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
-            boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
+            boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList,
+            List<LogicalVariable> highKeyVarList) {
         super(idx, requiresBroadcast);
         this.isPrimaryIndex = isPrimaryIndex;
         this.isEqCondition = isEqCondition;
@@ -57,6 +59,14 @@
         this.highKeyVarList = highKeyVarList;
     }
 
+    public void setImplConfig(Object implConfig) {
+        this.implConfig = implConfig;
+    }
+
+    public Object getImplConfig() {
+        return implConfig;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.BTREE_SEARCH;
@@ -89,16 +99,16 @@
             VariableUtilities.getLiveVariables(unnestMap, outputVars);
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
-                builder.getJobSpec(), outputVars, opSchema, typeEnv,  context, jobGenParams.getRetainInput(),
-                dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
-                jobGenParams.isHighKeyInclusive());
+                builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(), dataset,
+                jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
+                jobGenParams.isHighKeyInclusive(), implConfig);
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
 
         ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
         builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
     }
-    
+
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
         if (requiresBroadcast) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 053028a..68d6a81 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastRule;
 import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
@@ -229,6 +230,7 @@
         physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
         physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+        physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule());
         physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule());
         physicalRewritesAllLevels.add(new IntroHashPartitionMergeExchange());
         physicalRewritesAllLevels.add(new SetClosedRecordConstructorsRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
new file mode 100644
index 0000000..62cca6c
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.algebra.operators.CommitOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.BTreeSearchPOperator;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataImplConfig;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceInstantLockSearchCallbackRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    private void extractDataSourcesInfo(AbstractLogicalOperator op,
+            Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap) {
+
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                    FunctionIdentifier fid = f.getFunctionIdentifier();
+                    if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                        throw new IllegalStateException();
+                    }
+                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
+                    String indexName = jobGenParams.getIndexName();
+                    if (isPrimaryIndex) {
+                        if (dataSourcesMap.containsKey(indexName)) {
+                            ++(dataSourcesMap.get(indexName).first);
+                        } else {
+                            dataSourcesMap.put(indexName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+                                    LogicalOperatorTag.UNNEST_MAP, unnestMapOp.getPhysicalOperator()));
+                        }
+                    }
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+                DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
+                String datasetName = ((AqlDataSource) dataSourceScanOp.getDataSource()).getDataset().getDatasetName();
+                if (dataSourcesMap.containsKey(datasetName)) {
+                    ++(dataSourcesMap.get(datasetName).first);
+                } else {
+                    dataSourcesMap.put(datasetName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+                            LogicalOperatorTag.DATASOURCESCAN, dataSourceScanOp.getPhysicalOperator()));
+                }
+            }
+            extractDataSourcesInfo(descendantOp, dataSourcesMap);
+        }
+
+    }
+
+    private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
+        if (op.getPhysicalOperator() == null) {
+            return false;
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.EXTENSION_OPERATOR) {
+            ExtensionOperator extensionOp = (ExtensionOperator) op;
+            if (extensionOp.getDelegate() instanceof CommitOperator) {
+                return true;
+            }
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT
+                || op.getOperatorTag() == LogicalOperatorTag.WRITE_RESULT) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+        if (!checkIfRuleIsApplicable(op)) {
+            return false;
+        }
+        Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap = new HashMap<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>();
+        extractDataSourcesInfo(op, dataSourcesMap);
+
+        boolean introducedInstantLock = false;
+
+        Iterator<Map.Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>> it = dataSourcesMap
+                .entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> entry = it.next();
+            Triple<Integer, LogicalOperatorTag, IPhysicalOperator> triple = entry.getValue();
+            if (triple.first == 1) {
+                AqlMetadataImplConfig aqlMetadataImplConfig = new AqlMetadataImplConfig(true);
+                if (triple.second == LogicalOperatorTag.UNNEST_MAP) {
+                    BTreeSearchPOperator pOperator = (BTreeSearchPOperator) triple.third;
+                    pOperator.setImplConfig(aqlMetadataImplConfig);
+                    introducedInstantLock = true;
+                } else {
+                    DataSourceScanPOperator pOperator = (DataSourceScanPOperator) triple.third;
+                    pOperator.setImplConfig(aqlMetadataImplConfig);
+                    introducedInstantLock = true;
+                }
+            }
+
+        }
+        return introducedInstantLock;
+    }
+}
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index e3a9e91..84e152a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -59,7 +59,6 @@
         retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
         requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(5));
         isPrimaryIndex = datasetName.equals(indexName);
-        isPrimaryIndex = datasetName.equals(indexName);
     }
 
     public String getIndexName() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java
new file mode 100644
index 0000000..4ea752b
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataImplConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.declared;
+
+public class AqlMetadataImplConfig {
+    private final boolean useInstantLock;
+
+    public AqlMetadataImplConfig(boolean useInstantLock) {
+        this.useInstantLock = useInstantLock;
+    }
+
+    public boolean isInstantLock() {
+        return useInstantLock;
+    }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 938dfc4..e1f707c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,6 +68,7 @@
 import edu.uci.ics.asterix.runtime.formats.FormatUtils;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
@@ -247,7 +248,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
             IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
             List<LogicalVariable> projectVariables, boolean projectPushed, IOperatorSchema opSchema,
-            IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+            IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
             throws AlgebricksException {
         Dataset dataset;
         try {
@@ -264,12 +265,12 @@
                         return buildExternalDatasetScan(jobSpec, dataset, dataSource);
                     } else {
                         return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
-                                context);
+                                context, implConfig);
 
                     }
                 case INTERNAL: {
                     return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
-                            context);
+                            context, implConfig);
                 }
                 case EXTERNAL: {
                     return buildExternalDatasetScan(jobSpec, dataset, dataSource);
@@ -285,14 +286,14 @@
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context) throws AlgebricksException,
-            MetadataException {
+            Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context, Object implConfig)
+            throws AlgebricksException, MetadataException {
         AqlSourceId asid = dataSource.getId();
         String dataverseName = asid.getDataverseName();
         String datasetName = asid.getDatasetName();
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
         return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, context, false, dataset,
-                primaryIndex.getIndexName(), null, null, true, true);
+                primaryIndex.getIndexName(), null, null, true, true, implConfig);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
@@ -467,7 +468,8 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
-            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
+            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
+            throws AlgebricksException {
         boolean isSecondary = true;
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -523,11 +525,16 @@
                     primaryKeyFields[i] = i;
                 }
 
+                AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
                 TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-                searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
-                        primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+                if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
+                    searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+                } else {
+                    searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+                }
             }
-
             BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                     appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
                     typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,