Tuned and cleaned the surrogate-based index NL rewrite with inverted indexes.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fuzzy_perf@877 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 1826ad3..ab2ac27 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -5,7 +5,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -406,53 +405,28 @@
             probeSubTree = leftSubTree;
         }
         IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
+        InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
 
-        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
-        // to stitch together a top-level equi join.
-        Counter counter = new Counter(context.getVarCounter());
-        LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
-        ILogicalOperator probeSubTreeCopy = deepCopyVisitor.deepCopy(probeSubTree.root, null);
-        Mutable<ILogicalOperator> probeSubTreeRootRef = new MutableObject<ILogicalOperator>(probeSubTreeCopy);
-        context.setVarCounter(counter.get());
-        
         // Remember the original probe subtree, and its primary-key variables,
         // so we can later retrieve the missing attributes via an equi join.
-        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = probeSubTree.rootRef;
         List<LogicalVariable> originalSubTreePKs = new ArrayList<LogicalVariable>();
         probeSubTree.getPrimaryKeyVars(originalSubTreePKs);
         
-        // Replace the original probe subtree with its copy.
-        Dataset origDataset = probeSubTree.dataset;
-        ARecordType origRecordType = probeSubTree.recordType;
-        probeSubTree.initFromSubTree(probeSubTreeRootRef);
-        inferTypes(probeSubTreeRootRef.getValue(), context);
-        probeSubTree.dataset = origDataset;
-        probeSubTree.recordType = origRecordType;
+        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
+        // to stitch together a top-level equi join.
+        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = copyAndReinitProbeSubTree(probeSubTree, join
+                .getCondition().getValue(), optFuncExpr, context);
 
         // Remember the primary-keys of the new probe subtree for the top-level equi join.
         List<LogicalVariable> surrogateSubTreePKs = new ArrayList<LogicalVariable>();
         probeSubTree.getPrimaryKeyVars(surrogateSubTreePKs);
         
-        InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
-        // Replace the variables in the join condition and the optimizable function expression
-        // based on the mapping of variables in the copied probe subtree.
-        Map<LogicalVariable, LogicalVariable> varMapping = deepCopyVisitor.getVariableMapping();
-        for (Map.Entry<LogicalVariable, LogicalVariable> varMapEntry : varMapping.entrySet()) {
-            join.getCondition().getValue().substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
-            optFuncExpr.substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
-        }
+        // Remember original live variables from the index sub tree.
+        List<LogicalVariable> indexSubTreeLiveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
 
         // Clone the original join condition because we may have to modify it (and we also need the original).        
         ILogicalExpression joinCond = join.getCondition().getValue().cloneExpression();
-
-        
-        List<LogicalVariable> indexSubTreeLiveVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
-        
-        // Remember original live variables to make sure our new index-based plan returns exactly those vars as well.
-        List<LogicalVariable> originalLiveVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(join, originalLiveVars);
-
         // Create "panic" (non indexed) nested-loop join path if necessary.
         Mutable<ILogicalOperator> panicJoinRef = null;
         Map<LogicalVariable, LogicalVariable> panicVarMap = null;
@@ -473,17 +447,12 @@
         SelectOperator topSelect = new SelectOperator(new MutableObject<ILogicalExpression>(joinCond));
         topSelect.getInputs().add(indexSubTree.rootRef);
         topSelect.setExecutionMode(ExecutionMode.LOCAL);
-        context.computeAndSetTypeEnvironmentForOperator(topSelect);        
+        context.computeAndSetTypeEnvironmentForOperator(topSelect);
         ILogicalOperator topOp = topSelect;
-        
+
         // Hook up the indexed-nested loop join path with the "panic" (non indexed) nested-loop join path by putting a union all on top.
         if (panicJoinRef != null) {
-            // Gather live variables from the index plan and the panic plan.
-            //Set<LogicalVariable> indexSubTreeLiveVars = new HashSet<LogicalVariable>();
-            //VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
             indexSubTreeLiveVars.addAll(surrogateSubTreePKs);
-            
-            List<LogicalVariable> indexPlanLiveVars = originalLiveVars;
             List<LogicalVariable> panicPlanLiveVars = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(panicJoinRef.getValue(), panicPlanLiveVars);
             // Create variable mapping for union all operator.
@@ -503,49 +472,80 @@
             unionAllOp.setExecutionMode(ExecutionMode.PARTITIONED);
             context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
             topOp = unionAllOp;
-            
-            /* ORIGINAL CODE
-            List<LogicalVariable> indexPlanLiveVars = originalLiveVars;
-            List<LogicalVariable> panicPlanLiveVars = new ArrayList<LogicalVariable>();
-            VariableUtilities.getLiveVariables(panicJoinRef.getValue(), panicPlanLiveVars);
-            if (indexPlanLiveVars.size() != panicPlanLiveVars.size()) {
-                throw new AlgebricksException("Unequal number of variables returned from index plan and panic plan.");
-            }
-            // Create variable mapping for union all operator.
-            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
-            for (int i = 0; i < indexPlanLiveVars.size(); i++) {
-                varMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(indexPlanLiveVars.get(i),
-                        panicPlanLiveVars.get(i), indexPlanLiveVars.get(i)));
-            }
-            UnionAllOperator unionAllOp = new UnionAllOperator(varMap);
-            unionAllOp.getInputs().add(new MutableObject<ILogicalOperator>(joinRef.getValue()));
-            unionAllOp.getInputs().add(panicJoinRef);
-            unionAllOp.setExecutionMode(ExecutionMode.PARTITIONED);
-            context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
-            joinRef.setValue(unionAllOp);
-            */
         }
-        
+
         // Place a top-level equi-join on top to retrieve the missing variables from the original probe subtree.
-        // We create an equi join on 'originalSubTreePKs == originalSubTreePKs'.
-        ILogicalExpression eqJoinCondition = null;
-        if (originalSubTreePKs.size() == 1) {
-            List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
-            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(surrogateSubTreePKs.get(0))));
-            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(originalSubTreePKs.get(0))));
-            eqJoinCondition = new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.EQ), args);
-        } else {
-            // TODO: Handle composite PKs.
-        }
-        // The inner (probe) branch of the join is the subtree with the secondary-to-primary plan.
-        InnerJoinOperator topEqJoin = new InnerJoinOperator(new MutableObject<ILogicalExpression>(eqJoinCondition),
-                new MutableObject<ILogicalOperator>(topOp), originalProbeSubTreeRootRef);
+        // The inner (build) branch of the join is the subtree with the data scan, since the result of the similarity join could potentially be big.
+        // This choice may not always be the most efficient, but it seems more robust than the alternative.
+        Mutable<ILogicalExpression> eqJoinConditionRef = createPrimaryKeysEqJoinCondition(originalSubTreePKs,
+                surrogateSubTreePKs);
+        InnerJoinOperator topEqJoin = new InnerJoinOperator(eqJoinConditionRef, originalProbeSubTreeRootRef,
+                new MutableObject<ILogicalOperator>(topOp));
         topEqJoin.setExecutionMode(ExecutionMode.PARTITIONED);
         joinRef.setValue(topEqJoin);
         context.computeAndSetTypeEnvironmentForOperator(topEqJoin);
-        
+
         return true;
     }
+    
+    /**
+     * Copies the probeSubTree (using new variables), and reinitializes the probeSubTree to it.
+     * Accordingly replaces the variables in the given joinCond, and the optFuncExpr.
+     * Returns a reference to the original plan root.
+     */
+    private Mutable<ILogicalOperator> copyAndReinitProbeSubTree(OptimizableOperatorSubTree probeSubTree,
+            ILogicalExpression joinCond, IOptimizableFuncExpr optFuncExpr, IOptimizationContext context)
+            throws AlgebricksException {
+        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
+        // to stitch together a top-level equi join.
+        Counter counter = new Counter(context.getVarCounter());
+        LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
+        ILogicalOperator probeSubTreeCopy = deepCopyVisitor.deepCopy(probeSubTree.root, null);
+        Mutable<ILogicalOperator> probeSubTreeRootRef = new MutableObject<ILogicalOperator>(probeSubTreeCopy);
+        context.setVarCounter(counter.get());
+
+        // Remember the original probe subtree, and its primary-key variables,
+        // so we can later retrieve the missing attributes via an equi join.
+        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = probeSubTree.rootRef;
+
+        // Replace the original probe subtree with its copy.
+        Dataset origDataset = probeSubTree.dataset;
+        ARecordType origRecordType = probeSubTree.recordType;
+        probeSubTree.initFromSubTree(probeSubTreeRootRef);
+        inferTypes(probeSubTreeRootRef.getValue(), context);
+        probeSubTree.dataset = origDataset;
+        probeSubTree.recordType = origRecordType;
+
+        // Replace the variables in the join condition and the optimizable function expression
+        // based on the mapping of variables in the copied probe subtree.
+        Map<LogicalVariable, LogicalVariable> varMapping = deepCopyVisitor.getVariableMapping();
+        for (Map.Entry<LogicalVariable, LogicalVariable> varMapEntry : varMapping.entrySet()) {
+            joinCond.substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
+            optFuncExpr.substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
+        }
+        return originalProbeSubTreeRootRef;
+    }
+    
+    private Mutable<ILogicalExpression> createPrimaryKeysEqJoinCondition(List<LogicalVariable> originalSubTreePKs,
+            List<LogicalVariable> surrogateSubTreePKs) {
+        List<Mutable<ILogicalExpression>> eqExprs = new ArrayList<Mutable<ILogicalExpression>>();
+        int numPKVars = originalSubTreePKs.size();
+        for (int i = 0; i < numPKVars; i++) {
+            List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(surrogateSubTreePKs.get(i))));
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(originalSubTreePKs.get(i))));
+            ILogicalExpression eqFunc = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.EQ), args);
+            eqExprs.add(new MutableObject<ILogicalExpression>(eqFunc));
+        }
+        if (eqExprs.size() == 1) {
+            return eqExprs.get(0);
+        } else {
+            ILogicalExpression andFunc = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.AND), eqExprs);
+            return new MutableObject<ILogicalExpression>(andFunc);
+        }
+    }
 
     private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree,
@@ -574,8 +574,9 @@
         Counter counter = new Counter(context.getVarCounter());
         LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
         ILogicalOperator scanSubTree = deepCopyVisitor.deepCopy(indexSubTree.root, null);
-        context.setVarCounter(counter.get());        
-        panicVarMap.putAll(deepCopyVisitor.getVariableMapping());
+        context.setVarCounter(counter.get());
+        Map<LogicalVariable, LogicalVariable> copyVarMap = deepCopyVisitor.getVariableMapping();
+        panicVarMap.putAll(copyVarMap);
 
         List<LogicalVariable> copyLiveVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getLiveVariables(scanSubTree, copyLiveVars);
@@ -585,6 +586,12 @@
         // changed variables. 
         // TODO: Use the variable mapping from the deepcopy visitor to do this.
         InnerJoinOperator joinOp = (InnerJoinOperator) joinRef.getValue();
+        /*
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : copyVarMap.entrySet()) {
+            joinOp.getCondition().getValue().substituteVar(entry.getKey(), entry.getValue());
+        }
+        */
+        
         // Substitute vars in the join condition due to copying of the scanSubTree.
         List<LogicalVariable> joinCondUsedVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getUsedVariables(joinOp, joinCondUsedVars);