Saving intermediate state of index NL rewrite using surrogates.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fuzzy_perf@876 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
index a573df3..767d2ca 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
@@ -383,4 +383,8 @@
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
+    
+    public Map<LogicalVariable, LogicalVariable> getVariableMapping() {
+        return variableMapping;
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 51a20bf..e59fd8b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.asterix.optimizer.rules.FuzzyJoinRule;
 import edu.uci.ics.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceProjectRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastRule;
 import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
@@ -70,6 +69,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceProjectsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushAssignDownThroughProductRule;
@@ -80,7 +80,6 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectIntoJoinRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSubplanWithAggregateDownThroughProductRule;
-import edu.uci.ics.asterix.optimizer.rules.PushAggFuncIntoStandaloneAggregateRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.ReinferAllTypesRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVars;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.RemoveRedundantVariablesRule;
@@ -192,6 +191,7 @@
         accessMethod.add(new IntroduceSelectAccessMethodRule());
         accessMethod.add(new IntroduceJoinAccessMethodRule());
         accessMethod.add(new IntroduceSecondaryIndexInsertDeleteRule());        
+        accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
         return accessMethod;
     }
 
@@ -219,7 +219,7 @@
         physicalRewritesAllLevels.add(new SetClosedRecordConstructorsRule());
         physicalRewritesAllLevels.add(new PullPositionalVariableFromUnnestRule());
         //physicalRewritesAllLevels.add(new IntroduceProjectRule());
-        physicalRewritesAllLevels.add(new PushProjectDownRule());        
+        physicalRewritesAllLevels.add(new PushProjectDownRule());
         physicalRewritesAllLevels.add(new InsertProjectBeforeUnionRule());
         physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule());
         physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
@@ -233,6 +233,8 @@
         List<IAlgebraicRewriteRule> physicalRewritesTopLevel = new LinkedList<IAlgebraicRewriteRule>();
         physicalRewritesTopLevel.add(new PushNestedOrderByUnderPreSortedGroupByRule());
         physicalRewritesTopLevel.add(new PushLimitDownRule());
+        //physicalRewritesTopLevel.add(new IntroduceProjectsRule());
+        //physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
         return physicalRewritesTopLevel;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
index dd91fc1..aa38ce9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
@@ -22,4 +22,5 @@
     
     public int findLogicalVar(LogicalVariable var);
     public int findFieldName(String fieldName);
+    public void substituteVar(LogicalVariable original, LogicalVariable substitution);
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 0ab749d..1826ad3 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -1,8 +1,11 @@
 package edu.uci.ics.asterix.optimizer.rules.am;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -44,6 +47,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -403,20 +407,60 @@
         }
         IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
 
-        // Clone the original join condition because we may have to modify it (and we also need the original).
+        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
+        // to stitch together a top-level equi join.
+        Counter counter = new Counter(context.getVarCounter());
+        LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
+        ILogicalOperator probeSubTreeCopy = deepCopyVisitor.deepCopy(probeSubTree.root, null);
+        Mutable<ILogicalOperator> probeSubTreeRootRef = new MutableObject<ILogicalOperator>(probeSubTreeCopy);
+        context.setVarCounter(counter.get());
+        
+        // Remember the original probe subtree, and its primary-key variables,
+        // so we can later retrieve the missing attributes via an equi join.
+        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = probeSubTree.rootRef;
+        List<LogicalVariable> originalSubTreePKs = new ArrayList<LogicalVariable>();
+        probeSubTree.getPrimaryKeyVars(originalSubTreePKs);
+        
+        // Replace the original probe subtree with its copy.
+        Dataset origDataset = probeSubTree.dataset;
+        ARecordType origRecordType = probeSubTree.recordType;
+        probeSubTree.initFromSubTree(probeSubTreeRootRef);
+        inferTypes(probeSubTreeRootRef.getValue(), context);
+        probeSubTree.dataset = origDataset;
+        probeSubTree.recordType = origRecordType;
+
+        // Remember the primary-keys of the new probe subtree for the top-level equi join.
+        List<LogicalVariable> surrogateSubTreePKs = new ArrayList<LogicalVariable>();
+        probeSubTree.getPrimaryKeyVars(surrogateSubTreePKs);
+        
         InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
+        // Replace the variables in the join condition and the optimizable function expression
+        // based on the mapping of variables in the copied probe subtree.
+        Map<LogicalVariable, LogicalVariable> varMapping = deepCopyVisitor.getVariableMapping();
+        for (Map.Entry<LogicalVariable, LogicalVariable> varMapEntry : varMapping.entrySet()) {
+            join.getCondition().getValue().substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
+            optFuncExpr.substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
+        }
+
+        // Clone the original join condition because we may have to modify it (and we also need the original).        
         ILogicalExpression joinCond = join.getCondition().getValue().cloneExpression();
 
+        
+        List<LogicalVariable> indexSubTreeLiveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
+        
         // Remember original live variables to make sure our new index-based plan returns exactly those vars as well.
         List<LogicalVariable> originalLiveVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getLiveVariables(join, originalLiveVars);
 
         // Create "panic" (non indexed) nested-loop join path if necessary.
         Mutable<ILogicalOperator> panicJoinRef = null;
+        Map<LogicalVariable, LogicalVariable> panicVarMap = null;
         if (optFuncExpr.getFuncExpr().getFunctionIdentifier() == AsterixBuiltinFunctions.EDIT_DISTANCE_CHECK) {
             panicJoinRef = new MutableObject<ILogicalOperator>(joinRef.getValue());
+            panicVarMap = new HashMap<LogicalVariable, LogicalVariable>();
             Mutable<ILogicalOperator> newProbeRootRef = createPanicNestedLoopJoinPlan(panicJoinRef, indexSubTree,
-                    probeSubTree, optFuncExpr, chosenIndex, context);
+                    probeSubTree, optFuncExpr, chosenIndex, panicVarMap, context);
             probeSubTree.rootRef.setValue(newProbeRootRef.getValue());
             probeSubTree.root = newProbeRootRef.getValue();
         }
@@ -429,12 +473,38 @@
         SelectOperator topSelect = new SelectOperator(new MutableObject<ILogicalExpression>(joinCond));
         topSelect.getInputs().add(indexSubTree.rootRef);
         topSelect.setExecutionMode(ExecutionMode.LOCAL);
-        context.computeAndSetTypeEnvironmentForOperator(topSelect);
-        joinRef.setValue(topSelect);
-
+        context.computeAndSetTypeEnvironmentForOperator(topSelect);        
+        ILogicalOperator topOp = topSelect;
+        
         // Hook up the indexed-nested loop join path with the "panic" (non indexed) nested-loop join path by putting a union all on top.
         if (panicJoinRef != null) {
             // Gather live variables from the index plan and the panic plan.
+            //Set<LogicalVariable> indexSubTreeLiveVars = new HashSet<LogicalVariable>();
+            //VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
+            indexSubTreeLiveVars.addAll(surrogateSubTreePKs);
+            
+            List<LogicalVariable> indexPlanLiveVars = originalLiveVars;
+            List<LogicalVariable> panicPlanLiveVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(panicJoinRef.getValue(), panicPlanLiveVars);
+            // Create variable mapping for union all operator.
+            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
+            for (int i = 0; i < indexSubTreeLiveVars.size(); i++) {
+                LogicalVariable indexSubTreeVar = indexSubTreeLiveVars.get(i);
+                LogicalVariable panicPlanVar = panicVarMap.get(indexSubTreeVar);
+                if (panicPlanVar == null) {
+                    panicPlanVar = indexSubTreeVar;
+                }
+                varMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(indexSubTreeVar, panicPlanVar,
+                        indexSubTreeVar));
+            }
+            UnionAllOperator unionAllOp = new UnionAllOperator(varMap);
+            unionAllOp.getInputs().add(new MutableObject<ILogicalOperator>(topOp));
+            unionAllOp.getInputs().add(panicJoinRef);
+            unionAllOp.setExecutionMode(ExecutionMode.PARTITIONED);
+            context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
+            topOp = unionAllOp;
+            
+            /* ORIGINAL CODE
             List<LogicalVariable> indexPlanLiveVars = originalLiveVars;
             List<LogicalVariable> panicPlanLiveVars = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(panicJoinRef.getValue(), panicPlanLiveVars);
@@ -453,14 +523,34 @@
             unionAllOp.setExecutionMode(ExecutionMode.PARTITIONED);
             context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
             joinRef.setValue(unionAllOp);
+            */
         }
+        
+        // Place a top-level equi-join on top to retrieve the missing variables from the original probe subtree.
+        // We create an equi join on 'originalSubTreePKs == originalSubTreePKs'.
+        ILogicalExpression eqJoinCondition = null;
+        if (originalSubTreePKs.size() == 1) {
+            List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(surrogateSubTreePKs.get(0))));
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(originalSubTreePKs.get(0))));
+            eqJoinCondition = new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.EQ), args);
+        } else {
+            // TODO: Handle composite PKs.
+        }
+        // The inner (probe) branch of the join is the subtree with the secondary-to-primary plan.
+        InnerJoinOperator topEqJoin = new InnerJoinOperator(new MutableObject<ILogicalExpression>(eqJoinCondition),
+                new MutableObject<ILogicalOperator>(topOp), originalProbeSubTreeRootRef);
+        topEqJoin.setExecutionMode(ExecutionMode.PARTITIONED);
+        joinRef.setValue(topEqJoin);
+        context.computeAndSetTypeEnvironmentForOperator(topEqJoin);
+        
         return true;
     }
 
     private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree,
-            IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context)
-            throws AlgebricksException {
+            IOptimizableFuncExpr optFuncExpr, Index chosenIndex, Map<LogicalVariable, LogicalVariable> panicVarMap,
+            IOptimizationContext context) throws AlgebricksException {
         LogicalVariable inputSearchVar = getInputSearchVar(optFuncExpr, indexSubTree);
 
         // We split the plan into two "branches", and add selections on each side.
@@ -470,8 +560,8 @@
         context.computeAndSetTypeEnvironmentForOperator(replicateOp);
 
         // Create select ops for removing tuples that are filterable and not filterable, respectively.
-        IVariableTypeEnvironment topTypeEnv = context.getOutputTypeEnvironment(joinRef.getValue());
-        IAType inputSearchVarType = (IAType) topTypeEnv.getVarType(inputSearchVar);
+        IVariableTypeEnvironment probeTypeEnv = context.getOutputTypeEnvironment(probeSubTree.root);
+        IAType inputSearchVarType = (IAType) probeTypeEnv.getVarType(inputSearchVar);
         Mutable<ILogicalOperator> isFilterableSelectOpRef = new MutableObject<ILogicalOperator>();
         Mutable<ILogicalOperator> isNotFilterableSelectOpRef = new MutableObject<ILogicalOperator>();
         createIsFilterableSelectOps(replicateOp, inputSearchVar, inputSearchVarType, optFuncExpr, chosenIndex, context,
@@ -484,7 +574,8 @@
         Counter counter = new Counter(context.getVarCounter());
         LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
         ILogicalOperator scanSubTree = deepCopyVisitor.deepCopy(indexSubTree.root, null);
-        context.setVarCounter(counter.get());
+        context.setVarCounter(counter.get());        
+        panicVarMap.putAll(deepCopyVisitor.getVariableMapping());
 
         List<LogicalVariable> copyLiveVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getLiveVariables(scanSubTree, copyLiveVars);
@@ -492,6 +583,7 @@
         // Replace the inputs of the given join op, and replace variables in its
         // condition since we deep-copied one of the scanner subtrees which
         // changed variables. 
+        // TODO: Use the variable mapping from the deepcopy visitor to do this.
         InnerJoinOperator joinOp = (InnerJoinOperator) joinRef.getValue();
         // Substitute vars in the join condition due to copying of the scanSubTree.
         List<LogicalVariable> joinCondUsedVars = new ArrayList<LogicalVariable>();
@@ -816,4 +908,11 @@
             }
         }
     }
+    
+    private void inferTypes(ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
+        for(Mutable<ILogicalOperator> childOpRef : op.getInputs()) {
+            inferTypes(childOpRef.getValue(), context);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
index 13e515a..dbbe6d9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
@@ -96,4 +96,16 @@
     public OptimizableOperatorSubTree getOperatorSubTree(int index) {
         return subTrees[index];
     }
+
+    @Override
+    public void substituteVar(LogicalVariable original, LogicalVariable substitution) {
+        if (logicalVars != null) {
+            for (int i = 0; i < logicalVars.length; i++) {
+                if (logicalVars[i] == original) {
+                    logicalVars[i] = substitution;
+                    break;
+                }
+            }
+        }
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index 3269be7..fd1b89d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -16,6 +17,7 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -124,4 +126,11 @@
         dataset = null;
         recordType = null;
     }
+    
+    public void getPrimaryKeyVars(List<LogicalVariable> target) {
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            target.add(dataSourceScan.getVariables().get(i));
+        }
+    }
 }