Merge branch 'master' of https://code.google.com/p/asterixdb into icetindil/issue_765
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
index ba940d6..9fc2a12 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
@@ -384,8 +384,14 @@
     }
 
     @Override
-    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg) {
-        throw new UnsupportedOperationException();
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg) throws AlgebricksException {
+        UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()),
+                op.getVariableTypes(), op.propagatesInput());
+        deepCopyInputs(op, opCopy, arg);
+        copyAnnotations(op, opCopy);
+        opCopy.setExecutionMode(op.getExecutionMode());
+        return opCopy;
     }
 
     @Override
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 1023c6d..4c01708 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -295,7 +295,9 @@
                         }
                         // Set the fieldName in the corresponding matched function expression.
                         optFuncExpr.setFieldName(funcVarIndex, fieldName);
-                        fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                        if (subTree.hasDataSourceScan()) {
+                            fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                        }
                     }
                 }
                 else {
@@ -315,12 +317,14 @@
                     }
                     // Set the fieldName in the corresponding matched function expression.
                     optFuncExpr.setFieldName(funcVarIndex, fieldName);
-                    fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                    if (subTree.hasDataSourceScan()) {
+                        fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                    }
                 }
             }
 
             // Try to match variables from optFuncExpr to datasourcescan if not already matched in assigns.
-            List<LogicalVariable> dsVarList = subTree.dataSourceScan.getVariables();
+            List<LogicalVariable> dsVarList = subTree.getDataSourceVariables();
             for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
                 LogicalVariable var = dsVarList.get(varIndex);
                 int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -333,7 +337,9 @@
                 // Set the fieldName in the corresponding matched function expression, and remember matching subtree.
                 optFuncExpr.setFieldName(funcVarIndex, fieldName);
                 optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
-                fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                if (subTree.hasDataSourceScan()) {
+                    fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
+                }
             }
         }
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
index f2211e6..f5e6378 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -198,6 +198,10 @@
                     }
                     break;
                 }
+                case LENGTH_PARTITIONED_NGRAM_INVIX:
+                case LENGTH_PARTITIONED_WORD_INVIX:
+                default:
+                    break;
             }
         }
         // Primary keys.
@@ -222,7 +226,7 @@
         }
     }
 
-    public static List<LogicalVariable> getPrimaryKeyVarsFromUnnestMap(Dataset dataset, ILogicalOperator unnestMapOp) {
+    public static List<LogicalVariable> getPrimaryKeyVarsFromSecondaryUnnestMap(Dataset dataset, ILogicalOperator unnestMapOp) {
         int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         List<LogicalVariable> primaryKeyVars = new ArrayList<LogicalVariable>();
         List<LogicalVariable> sourceVars = ((UnnestMapOperator) unnestMapOp).getVariables();
@@ -235,6 +239,17 @@
         return primaryKeyVars;
     }
 
+    public static List<LogicalVariable> getPrimaryKeyVarsFromPrimaryUnnestMap(Dataset dataset, ILogicalOperator unnestMapOp) {
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        List<LogicalVariable> primaryKeyVars = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> sourceVars = ((UnnestMapOperator) unnestMapOp).getVariables();
+        // Assumes the primary keys are located at the beginning.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            primaryKeyVars.add(sourceVars.get(i));
+        }
+        return primaryKeyVars;
+    }
+
     /**
      * Returns the search key expression which feeds a secondary-index search. If we are optimizing a selection query then this method returns
      * the a ConstantExpression from the first constant value in the optimizable function expression.
@@ -295,7 +310,7 @@
     public static UnnestMapOperator createPrimaryIndexUnnestMap(DataSourceScanOperator dataSourceScan, Dataset dataset,
             ARecordType recordType, ILogicalOperator inputOp, IOptimizationContext context, boolean sortPrimaryKeys,
             boolean retainInput, boolean requiresBroadcast) throws AlgebricksException {
-        List<LogicalVariable> primaryKeyVars = AccessMethodUtils.getPrimaryKeyVarsFromUnnestMap(dataset, inputOp);
+        List<LogicalVariable> primaryKeyVars = AccessMethodUtils.getPrimaryKeyVarsFromSecondaryUnnestMap(dataset, inputOp);
         // Optionally add a sort on the primary-index keys before searching the primary index.
         OrderOperator order = null;
         if (sortPrimaryKeys) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 701efd9..fc50ec5 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -127,7 +127,7 @@
         if (conditionRef.getValue() != null) {
             select.getInputs().clear();
             if (op != null) {
-                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+                subTree.dataSourceRef.setValue(primaryIndexUnnestOp);
                 select.getInputs().add(new MutableObject<ILogicalOperator>(op));
             } else {
                 select.getInputs().add(new MutableObject<ILogicalOperator>(primaryIndexUnnestOp));
@@ -135,7 +135,7 @@
         } else {
             ((AbstractLogicalOperator) primaryIndexUnnestOp).setExecutionMode(ExecutionMode.PARTITIONED);
             if (op != null) {
-                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+                subTree.dataSourceRef.setValue(primaryIndexUnnestOp);
                 selectRef.setValue(op);
             } else {
                 selectRef.setValue(primaryIndexUnnestOp);
@@ -155,10 +155,11 @@
         // Determine probe and index subtrees based on chosen index.
         OptimizableOperatorSubTree indexSubTree = null;
         OptimizableOperatorSubTree probeSubTree = null;
-        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+        if (leftSubTree.hasDataSourceScan() && leftSubTree.dataset != null
+                && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
             indexSubTree = leftSubTree;
             probeSubTree = rightSubTree;
-        } else if (rightSubTree.dataset != null
+        } else if (rightSubTree.hasDataSourceScan() && rightSubTree.dataset != null
                 && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
@@ -169,7 +170,7 @@
             return false;
         }
         // If there are conditions left, add a new select operator on top.
-        indexSubTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        indexSubTree.dataSourceRef.setValue(primaryIndexUnnestOp);
         if (conditionRef.getValue() != null) {
             SelectOperator topSelect = new SelectOperator(conditionRef);
             topSelect.getInputs().add(indexSubTree.rootRef);
@@ -189,7 +190,8 @@
             boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
         Dataset dataset = indexSubTree.dataset;
         ARecordType recordType = indexSubTree.recordType;
-        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
+        // we made sure indexSubTree has datasource scan
+        DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) indexSubTree.dataSourceRef.getValue();
         int numSecondaryKeys = chosenIndex.getKeyFieldNames().size();
 
         // Info on high and low keys for the BTree search predicate.
@@ -423,7 +425,7 @@
 
             // Replace the datasource scan with the new plan rooted at
             // primaryIndexUnnestMap.
-            indexSubTree.dataSourceScanRef.setValue(primaryIndexUnnestOp); //kisskys
+            indexSubTree.dataSourceRef.setValue(primaryIndexUnnestOp);
         } else {
             List<Object> primaryIndexOutputTypes = new ArrayList<Object>();
             try {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index 1ee8107..32d459d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -39,9 +39,10 @@
  * This rule optimizes a join with secondary indexes into an indexed nested-loop join.
  * Matches the following operator pattern:
  * (join) <-- (select)? <-- (assign | unnest)+ <-- (datasource scan)
- * <-- (select)? <-- (assign | unnest)+ <-- (datasource scan)
+ * <-- (select)? <-- (assign | unnest)+ <-- (datasource scan | unnest-map)
+ * The order of the join inputs does not matter.
  * Replaces the above pattern with the following simplified plan:
- * (select) <-- (assign) <-- (btree search) <-- (sort) <-- (unnest(index search)) <-- (assign) <-- (datasource scan)
+ * (select) <-- (assign) <-- (btree search) <-- (sort) <-- (unnest(index search)) <-- (assign) <-- (datasource scan | unnest-map)
  * The sort is optional, and some access methods may choose not to sort.
  * Note that for some index-based optimizations we do not remove the triggering
  * condition from the join, since the secondary index may only act as a filter, and the
@@ -52,8 +53,6 @@
  * 3. Check metadata to see if there are applicable indexes.
  * 4. Choose an index to apply (for now only a single index will be chosen).
  * 5. Rewrite plan using index (delegated to IAccessMethods).
- * TODO (Alex): Currently this rule requires a data scan on both inputs of the join. I should generalize the pattern
- * to accept any subtree on one side, as long as the other side has a datasource scan.
  */
 public class IntroduceJoinAccessMethodRule extends AbstractIntroduceAccessMethodRule {
 
@@ -85,10 +84,10 @@
         Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs = new HashMap<IAccessMethod, AccessMethodAnalysisContext>();
         boolean matchInLeftSubTree = false;
         boolean matchInRightSubTree = false;
-        if (leftSubTree.hasDataSourceScan()) {
+        if (leftSubTree.hasDataSource()) {
             matchInLeftSubTree = analyzeCondition(joinCond, leftSubTree.assignsAndUnnests, analyzedAMs);
         }
-        if (rightSubTree.hasDataSourceScan()) {
+        if (rightSubTree.hasDataSource()) {
             matchInRightSubTree = analyzeCondition(joinCond, rightSubTree.assignsAndUnnests, analyzedAMs);
         }
         if (!matchInLeftSubTree && !matchInRightSubTree) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index 9ae8c9f..28ec41d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -137,7 +137,8 @@
             return false;
         }
         selectCond = (AbstractFunctionCallExpression) condExpr;
-        return subTree.initFromSubTree(op1.getInputs().get(0));
+        boolean res = subTree.initFromSubTree(op1.getInputs().get(0));
+        return res && subTree.hasDataSourceScan();
     }
 
     @Override
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 1a7a747..9c49783 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.asterix.common.annotations.SkipSecondaryIndexSearchExpressionAnnotation;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.base.AFloat;
@@ -337,7 +338,8 @@
             boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
         Dataset dataset = indexSubTree.dataset;
         ARecordType recordType = indexSubTree.recordType;
-        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
+        // we made sure indexSubTree has datasource scan
+        DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) indexSubTree.dataSourceRef.getValue();
 
         InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams(chosenIndex.getIndexName(),
                 chosenIndex.getIndexType(), dataset.getDataverseName(), dataset.getDatasetName(), retainInput,
@@ -401,7 +403,7 @@
         ILogicalOperator indexPlanRootOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr, false,
                 false, context);
         // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
-        subTree.dataSourceScanRef.setValue(indexPlanRootOp);
+        subTree.dataSourceRef.setValue(indexPlanRootOp);
         return true;
     }
 
@@ -414,12 +416,16 @@
         // Determine probe and index subtrees based on chosen index.
         OptimizableOperatorSubTree indexSubTree = null;
         OptimizableOperatorSubTree probeSubTree = null;
-        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+        if (leftSubTree.hasDataSourceScan() && leftSubTree.dataset != null
+                && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
             indexSubTree = leftSubTree;
             probeSubTree = rightSubTree;
-        } else if (rightSubTree.dataset != null && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
+        } else if (rightSubTree.hasDataSourceScan() && rightSubTree.dataset != null
+                && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
+        } else {
+            return false;
         }
         IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
         // The arguments of edit-distance-contains() function are asymmetrical, we can only use index if the dataset of index subtree and the dataset of first argument's subtree is the same     
@@ -463,7 +469,7 @@
         // Create regular indexed-nested loop join path.
         ILogicalOperator indexPlanRootOp = createSecondaryToPrimaryPlan(indexSubTree, probeSubTree, chosenIndex,
                 optFuncExpr, true, true, context);
-        indexSubTree.dataSourceScanRef.setValue(indexPlanRootOp);
+        indexSubTree.dataSourceRef.setValue(indexPlanRootOp);
 
         // Change join into a select with the same condition.
         SelectOperator topSelect = new SelectOperator(new MutableObject<ILogicalExpression>(joinCond));
@@ -521,7 +527,8 @@
             ILogicalExpression joinCond, IOptimizableFuncExpr optFuncExpr, List<LogicalVariable> originalSubTreePKs,
             List<LogicalVariable> surrogateSubTreePKs, IOptimizationContext context) throws AlgebricksException {
 
-        probeSubTree.getPrimaryKeyVars(originalSubTreePKs);
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        probeSubTree.getPrimaryKeyVars(originalSubTreePKs, metadataProvider);
 
         // Create two copies of the original probe subtree.
         // The first copy, which becomes the new probe subtree, will retain the primary-key and secondary-search key variables,
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index 33ca4d1..195e8e9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -23,31 +23,43 @@
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.optimizer.base.AnalysisUtil;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 
 /**
  * Operator subtree that matches the following patterns, and provides convenient access to its nodes:
- * (select)? <-- (assign | unnest)+ <-- (datasource scan)
+ * (select)? <-- (assign | unnest)+ <-- (datasource scan | unnest-map)
  * and
- * (select)? <-- (datasource scan)
+ * (select)? <-- (datasource scan | unnest-map)
  */
 public class OptimizableOperatorSubTree {
+
+    public static enum DataSourceType {
+        DATASOURCE_SCAN,
+        PRIMARY_INDEX_LOOKUP,
+        NO_DATASOURCE
+    }
+
     public ILogicalOperator root = null;
     public Mutable<ILogicalOperator> rootRef = null;
     public final List<Mutable<ILogicalOperator>> assignsAndUnnestsRefs = new ArrayList<Mutable<ILogicalOperator>>();
     public final List<AbstractLogicalOperator> assignsAndUnnests = new ArrayList<AbstractLogicalOperator>();
-    public Mutable<ILogicalOperator> dataSourceScanRef = null;
-    public DataSourceScanOperator dataSourceScan = null;
+    public Mutable<ILogicalOperator> dataSourceRef = null;
+    public DataSourceType dataSourceType = DataSourceType.NO_DATASOURCE;
     // Dataset and type metadata. Set in setDatasetAndTypeMetadata().
     public Dataset dataset = null;
     public ARecordType recordType = null;
@@ -67,9 +79,24 @@
         if (subTreeOp.getOperatorTag() != LogicalOperatorTag.ASSIGN && subTreeOp.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             // Pattern may still match if we are looking for primary index matches as well.
             if (subTreeOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
-                dataSourceScanRef = subTreeOpRef;
-                dataSourceScan = (DataSourceScanOperator) subTreeOp;
+                dataSourceType = DataSourceType.DATASOURCE_SCAN;
+                dataSourceRef = subTreeOpRef;
                 return true;
+            } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) subTreeOp;
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                    if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                        AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                        jobGenParams.readFromFuncArgs(f.getArguments());
+                        if(jobGenParams.isPrimaryIndex()) {
+                            dataSourceType = DataSourceType.PRIMARY_INDEX_LOOKUP;
+                            dataSourceRef = subTreeOpRef;
+                            return true;
+                        }
+                    }
+                }
             }
             return false;
         }
@@ -82,12 +109,30 @@
             subTreeOp = (AbstractLogicalOperator) subTreeOpRef.getValue();
         } while (subTreeOp.getOperatorTag() == LogicalOperatorTag.ASSIGN || subTreeOp.getOperatorTag() == LogicalOperatorTag.UNNEST);
 
+        // Match index search.
+        if (subTreeOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+            UnnestMapOperator unnestMapOp = (UnnestMapOperator) subTreeOp;
+            ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+            if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    if(jobGenParams.isPrimaryIndex()) {
+                        dataSourceType = DataSourceType.PRIMARY_INDEX_LOOKUP;
+                        dataSourceRef = subTreeOpRef;
+                        return true;
+                    }
+                }
+            }
+        }
+
         // Match datasource scan.
         if (subTreeOp.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
             return false;
         }
-        dataSourceScanRef = subTreeOpRef;
-        dataSourceScan = (DataSourceScanOperator) subTreeOp;
+        dataSourceType = DataSourceType.DATASOURCE_SCAN;
+        dataSourceRef = subTreeOpRef;
         return true;
     }
 
@@ -96,16 +141,38 @@
      * Also sets recordType to be the type of that dataset.
      */
     public boolean setDatasetAndTypeMetadata(AqlMetadataProvider metadataProvider) throws AlgebricksException {
-        if (dataSourceScan == null) {
-            return false;
+        String dataverseName = null;
+        String datasetName = null;
+        switch (dataSourceType) {
+            case DATASOURCE_SCAN:
+                DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) dataSourceRef.getValue();
+                Pair<String, String> datasetInfo = AnalysisUtil.getDatasetInfo(dataSourceScan);
+                dataverseName = datasetInfo.first;
+                datasetName = datasetInfo.second;
+                break;
+            case PRIMARY_INDEX_LOOKUP:
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) dataSourceRef.getValue();
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                    return false;
+                }
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                if (!f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                    return false;
+                }
+                AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                jobGenParams.readFromFuncArgs(f.getArguments());
+                datasetName = jobGenParams.getDatasetName();
+                dataverseName = jobGenParams.getDataverseName();
+                break;
+            case NO_DATASOURCE:
+            default:
+                break;
         }
-        // Find the dataset corresponding to the datasource scan in the metadata.
-        Pair<String, String> datasetInfo = AnalysisUtil.getDatasetInfo(dataSourceScan);
-        String dataverseName = datasetInfo.first;
-        String datasetName = datasetInfo.second;
         if (dataverseName == null || datasetName == null) {
             return false;
         }
+        // Find the dataset corresponding to the datasource in the metadata.
         dataset = metadataProvider.findDataset(dataverseName, datasetName);
         if (dataset == null) {
             throw new AlgebricksException("No metadata for dataset " + datasetName);
@@ -122,8 +189,12 @@
         return true;
     }
 
+    public boolean hasDataSource() {
+        return dataSourceType != DataSourceType.NO_DATASOURCE;
+    }
+
     public boolean hasDataSourceScan() {
-        return dataSourceScan != null;
+        return dataSourceType == DataSourceType.DATASOURCE_SCAN;
     }
 
     public void reset() {
@@ -131,16 +202,44 @@
         rootRef = null;
         assignsAndUnnestsRefs.clear();
         assignsAndUnnests.clear();
-        dataSourceScanRef = null;
-        dataSourceScan = null;
+        dataSourceRef = null;
+        dataSourceType = DataSourceType.NO_DATASOURCE;
         dataset = null;
         recordType = null;
     }
 
-    public void getPrimaryKeyVars(List<LogicalVariable> target) {
-        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            target.add(dataSourceScan.getVariables().get(i));
+    public void getPrimaryKeyVars(List<LogicalVariable> target, AqlMetadataProvider metadataProvider) {
+        switch (dataSourceType) {
+            case DATASOURCE_SCAN:
+                DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) dataSourceRef.getValue();
+                int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+                for (int i = 0; i < numPrimaryKeys; i++) {
+                    target.add(dataSourceScan.getVariables().get(i));
+                }
+                break;
+            case PRIMARY_INDEX_LOOKUP:
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) dataSourceRef.getValue();
+                List<LogicalVariable> primaryKeys = null;
+                primaryKeys = AccessMethodUtils.getPrimaryKeyVarsFromPrimaryUnnestMap(dataset, unnestMapOp);
+                target.addAll(primaryKeys);
+                break;
+            case NO_DATASOURCE:
+            default:
+                break;
+        }
+    }
+
+    public List<LogicalVariable> getDataSourceVariables() {
+        switch (dataSourceType) {
+            case DATASOURCE_SCAN:
+                DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) dataSourceRef.getValue();
+                return dataSourceScan.getVariables();
+            case PRIMARY_INDEX_LOOKUP:
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) dataSourceRef.getValue();
+                return unnestMapOp.getVariables();
+            case NO_DATASOURCE:
+            default:
+                return new ArrayList<LogicalVariable>();
         }
     }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index fb5ad49..9fb658c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -98,7 +98,7 @@
             return false;
         }
         // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
-        subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        subTree.dataSourceRef.setValue(primaryIndexUnnestOp);
         return true;
     }
 
@@ -111,10 +111,11 @@
         // Determine probe and index subtrees based on chosen index.
         OptimizableOperatorSubTree indexSubTree = null;
         OptimizableOperatorSubTree probeSubTree = null;
-        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+        if (leftSubTree.hasDataSourceScan() && leftSubTree.dataset != null
+                && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
             indexSubTree = leftSubTree;
             probeSubTree = rightSubTree;
-        } else if (rightSubTree.dataset != null
+        } else if (rightSubTree.hasDataSourceScan() && rightSubTree.dataset != null
                 && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
@@ -126,7 +127,7 @@
         if (primaryIndexUnnestOp == null) {
             return false;
         }
-        indexSubTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        indexSubTree.dataSourceRef.setValue(primaryIndexUnnestOp);
         // Change join into a select with the same condition.
         AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinRef.getValue();
         SelectOperator topSelect = new SelectOperator(joinOp.getCondition());
@@ -149,8 +150,8 @@
         IAType spatialType = keyPairType.first;
         int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numSecondaryKeys = numDimensions * 2;
-
-        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
+        // we made sure indexSubTree has datasource scan
+        DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) indexSubTree.dataSourceRef.getValue();
         RTreeJobGenParams jobGenParams = new RTreeJobGenParams(chosenIndex.getIndexName(), IndexType.RTREE,
                 dataset.getDataverseName(), dataset.getDatasetName(), retainInput, requiresBroadcast);
         // A spatial object is serialized in the constant of the func expr we are optimizing.
diff --git a/asterix-app/src/test/resources/optimizerts/queries/inverted-index-join/word-jaccard-check-after-btree-access.aql b/asterix-app/src/test/resources/optimizerts/queries/inverted-index-join/word-jaccard-check-after-btree-access.aql
new file mode 100644
index 0000000..d656045
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/inverted-index-join/word-jaccard-check-after-btree-access.aql
@@ -0,0 +1,51 @@
+/*
+ * Description    : Fuzzy self joins a dataset, TweetMessages, based on the similarity-jaccard-check function of its text-messages' word tokens.
+ *                  TweetMessages has a keyword index on text-message and btree index on the primary key tweetid, and we expect the join to be 
+ *					transformed into btree and inverted indexed nested-loop joins. We test whether the join condition can be transformed into 
+ *					multiple indexed nested loop joins of various type of indexes.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TwitterUserType as closed {
+	screen-name: string,
+	lang: string,
+	friends-count: int32,
+	statuses-count: int32,
+	name: string,
+	followers-count: int32
+} 
+
+create type TweetMessageType as closed {
+	tweetid: int64,
+	user: TwitterUserType,
+	sender-location: point,
+	send-time: datetime,
+	referred-topics: {{ string }},
+	message-text: string,
+	countA: int32,
+	countB: int32
+}
+
+create dataset TweetMessages(TweetMessageType)
+primary key tweetid;
+
+create index twmSndLocIx on TweetMessages(sender-location) type rtree;
+create index msgCountAIx on TweetMessages(countA) type btree;
+create index msgCountBIx on TweetMessages(countB) type btree;
+create index msgTextIx on TweetMessages(message-text) type keyword;
+
+write output to nc1:"rttest/inverted-index-join_word-jaccard-check-after-btree-access.adm";
+
+for $t1 in dataset('TweetMessages')
+for $t2 in dataset('TweetMessages')
+let $sim := similarity-jaccard-check($t1.message-text, $t2.message-text, 0.6f)
+where $sim[0] and $t1.tweetid < int64("20") and $t2.tweetid != $t1.tweetid
+return {
+    "t1": $t1.tweetid,
+    "t2": $t2.tweetid,
+    "sim": $sim[1]
+}
diff --git a/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/word-jaccard-check-after-btree-access.plan b/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/word-jaccard-check-after-btree-access.plan
new file mode 100644
index 0000000..237864f
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/word-jaccard-check-after-btree-access.plan
@@ -0,0 +1,34 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- HYBRID_HASH_JOIN [$$34][$$22]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- BTREE_SEARCH  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$37(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file