Merged asterix_stabilization r867:r878 and r882:r887.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fuzzy_perf@888 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
index a573df3..767d2ca 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
@@ -383,4 +383,8 @@
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
+    
+    public Map<LogicalVariable, LogicalVariable> getVariableMapping() {
+        return variableMapping;
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 830e33b..d33029b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -39,7 +39,9 @@
 import edu.uci.ics.asterix.optimizer.rules.PushFieldAccessRule;
 import edu.uci.ics.asterix.optimizer.rules.PushGroupByThroughProduct;
 import edu.uci.ics.asterix.optimizer.rules.PushProperJoinThroughProduct;
+import edu.uci.ics.asterix.optimizer.rules.PushSimilarityFunctionsBelowJoin;
 import edu.uci.ics.asterix.optimizer.rules.RemoveRedundantListifyRule;
+import edu.uci.ics.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
 import edu.uci.ics.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import edu.uci.ics.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
 import edu.uci.ics.asterix.optimizer.rules.SimilarityCheckRule;
@@ -190,7 +192,10 @@
         List<IAlgebraicRewriteRule> accessMethod = new LinkedList<IAlgebraicRewriteRule>();
         accessMethod.add(new IntroduceSelectAccessMethodRule());
         accessMethod.add(new IntroduceJoinAccessMethodRule());
-        accessMethod.add(new IntroduceSecondaryIndexInsertDeleteRule());        
+        accessMethod.add(new IntroduceSecondaryIndexInsertDeleteRule());
+        accessMethod.add(new RemoveUnusedOneToOneEquiJoinRule());
+        accessMethod.add(new PushSimilarityFunctionsBelowJoin());
+        accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
         return accessMethod;
     }
 
@@ -198,6 +203,7 @@
         List<IAlgebraicRewriteRule> planCleanupRules = new LinkedList<IAlgebraicRewriteRule>();        
         planCleanupRules.add(new PushAssignBelowUnionAllRule());
         planCleanupRules.add(new ExtractCommonExpressionsRule());
+        planCleanupRules.add(new RemoveRedundantVariablesRule());
         planCleanupRules.add(new PushProjectDownRule());
         planCleanupRules.add(new PushSelectDownRule());
         planCleanupRules.add(new RemoveUnusedAssignAndAggregateRule());
@@ -219,6 +225,7 @@
         physicalRewritesAllLevels.add(new IntroHashPartitionMergeExchange());
         physicalRewritesAllLevels.add(new SetClosedRecordConstructorsRule());
         physicalRewritesAllLevels.add(new PullPositionalVariableFromUnnestRule());
+        //physicalRewritesAllLevels.add(new IntroduceProjectRule());
         physicalRewritesAllLevels.add(new PushProjectDownRule());
         physicalRewritesAllLevels.add(new InsertProjectBeforeUnionRule());
         physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule());
@@ -233,6 +240,8 @@
         List<IAlgebraicRewriteRule> physicalRewritesTopLevel = new LinkedList<IAlgebraicRewriteRule>();
         physicalRewritesTopLevel.add(new PushNestedOrderByUnderPreSortedGroupByRule());
         physicalRewritesTopLevel.add(new PushLimitDownRule());
+        //physicalRewritesTopLevel.add(new IntroduceProjectsRule());
+        //physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
         return physicalRewritesTopLevel;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushSimilarityFunctionsBelowJoin.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushSimilarityFunctionsBelowJoin.java
new file mode 100644
index 0000000..69a5fb5
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushSimilarityFunctionsBelowJoin.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes similarity function-call expressions below a join if possible.
+ * Assigns the similarity function-call expressions to new variables, and replaces the original
+ * expression with a corresponding variable reference expression.
+ * This rule can help reduce the cost of computing expensive similarity functions by pushing them below
+ * a join (which may blow up the cardinality).
+ * Also, this rule may help to enable other rules such as common subexpression elimination, again to reduce
+ * the number of calls to expensive similarity functions.
+ * 
+ * Example:
+ * 
+ * Before plan:
+ * assign [$$10] <- [funcA(funcB(simFuncX($$3, $$4)))]
+ *   join (some condition) 
+ *     join_branch_0 where $$3 and $$4 are not live
+ *       ...
+ *     join_branch_1 where $$3 and $$4 are live
+ *       ...
+ * 
+ * After plan:
+ * assign [$$10] <- [funcA(funcB($$11))]
+ *   join (some condition) 
+ *     join_branch_0 where $$3 and $$4 are not live
+ *       ...
+ *     join_branch_1 where $$3 and $$4 are live
+ *       assign[$$11] <- [simFuncX($$3, $$4)]
+ *         ...
+ */
+public class PushSimilarityFunctionsBelowJoin implements IAlgebraicRewriteRule {
+
+    private static final Set<FunctionIdentifier> simFuncIdents = new HashSet<FunctionIdentifier>();
+    static {
+        simFuncIdents.add(AsterixBuiltinFunctions.SIMILARITY_JACCARD);
+        simFuncIdents.add(AsterixBuiltinFunctions.SIMILARITY_JACCARD_CHECK);
+        simFuncIdents.add(AsterixBuiltinFunctions.EDIT_DISTANCE);
+        simFuncIdents.add(AsterixBuiltinFunctions.EDIT_DISTANCE_CHECK);
+    }
+
+    private final List<Mutable<ILogicalExpression>> simFuncExprs = new ArrayList<Mutable<ILogicalExpression>>();
+    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        AssignOperator assignOp = (AssignOperator) op;
+
+        // Find a join operator below this assign.
+        Mutable<ILogicalOperator> joinOpRef = findJoinOp(assignOp.getInputs().get(0));
+        if (joinOpRef == null) {
+            return false;
+        }
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinOpRef.getValue();
+
+        // Check if the assign uses a similarity function that we wish to push below the join if possible.
+        simFuncExprs.clear();
+        gatherSimilarityFunctionCalls(assignOp, simFuncExprs);
+        if (simFuncExprs.isEmpty()) {
+            return false;
+        }
+
+        // Try to push the similarity functions down the input branches of the join.
+        boolean modified = false;
+        if (pushDownSimilarityFunctions(joinOp, 0, simFuncExprs, context)) {
+            modified = true;
+        }
+        if (pushDownSimilarityFunctions(joinOp, 1, simFuncExprs, context)) {
+            modified = true;
+        }
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(joinOp);
+        }
+        return modified;
+    }
+
+    private Mutable<ILogicalOperator> findJoinOp(Mutable<ILogicalOperator> opRef) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        switch (op.getOperatorTag()) {
+            case INNERJOIN:
+            case LEFTOUTERJOIN: {
+                return opRef;
+            }
+            // Bail on these operators.
+            case GROUP:
+            case AGGREGATE:
+            case DISTINCT:
+            case UNNEST_MAP: {
+                return null;
+            }
+            // Traverse children.
+            default: {
+                for (Mutable<ILogicalOperator> childOpRef : op.getInputs()) {
+                    return findJoinOp(childOpRef);
+                }
+            }
+        }
+        return null;
+    }
+
+    private void gatherSimilarityFunctionCalls(AssignOperator assignOp, List<Mutable<ILogicalExpression>> simFuncExprs) {
+        for (Mutable<ILogicalExpression> exprRef : assignOp.getExpressions()) {
+            gatherSimilarityFunctionCalls(exprRef, simFuncExprs);
+        }
+    }
+
+    private void gatherSimilarityFunctionCalls(Mutable<ILogicalExpression> exprRef,
+            List<Mutable<ILogicalExpression>> simFuncExprs) {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return;
+        }
+        // Check whether the function is a similarity function.
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        if (simFuncIdents.contains(funcExpr.getFunctionIdentifier())) {
+            simFuncExprs.add(exprRef);
+        }
+        // Traverse arguments.
+        for (Mutable<ILogicalExpression> funcArg : funcExpr.getArguments()) {
+            gatherSimilarityFunctionCalls(funcArg, simFuncExprs);
+        }
+    }
+
+    private boolean pushDownSimilarityFunctions(AbstractBinaryJoinOperator joinOp, int inputIndex,
+            List<Mutable<ILogicalExpression>> simFuncExprs, IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator joinInputOp = joinOp.getInputs().get(inputIndex).getValue();
+        liveVars.clear();
+        VariableUtilities.getLiveVariables(joinInputOp, liveVars);
+        Iterator<Mutable<ILogicalExpression>> simFuncIter = simFuncExprs.iterator();
+        List<LogicalVariable> assignVars = null;
+        List<Mutable<ILogicalExpression>> assignExprs = null;
+        while (simFuncIter.hasNext()) {
+            Mutable<ILogicalExpression> simFuncExprRef = simFuncIter.next();
+            ILogicalExpression simFuncExpr = simFuncExprRef.getValue();
+            usedVars.clear();
+            simFuncExpr.getUsedVariables(usedVars);
+            // Check if we can push the similarity function down this branch.
+            if (liveVars.containsAll(usedVars)) {
+                if (assignVars == null) {
+                    assignVars = new ArrayList<LogicalVariable>();
+                    assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
+                }
+                // Replace the original expression with a variable reference expression.
+                LogicalVariable replacementVar = context.newVar();
+                assignVars.add(replacementVar);
+                assignExprs.add(new MutableObject<ILogicalExpression>(simFuncExpr));
+                simFuncExprRef.setValue(new VariableReferenceExpression(replacementVar));
+                simFuncIter.remove();
+            }
+        }
+        // Create new assign operator below the join if any similarity functions can be pushed.
+        if (assignVars != null) {
+            AssignOperator newAssign = new AssignOperator(assignVars, assignExprs);
+            newAssign.getInputs().add(new MutableObject<ILogicalOperator>(joinInputOp));
+            newAssign.setExecutionMode(joinOp.getExecutionMode());
+            joinOp.getInputs().get(inputIndex).setValue(newAssign);
+            context.computeAndSetTypeEnvironmentForOperator(newAssign);
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
new file mode 100644
index 0000000..9089c42
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes join operators for which all of the following conditions are true:
+ * 1. The live variables of one input branch of the join are not used in the upstream plan
+ * 2. The join is an inner equi join
+ * 3. The join condition only uses variables that correspond to primary keys of the same dataset    
+ * Notice that the last condition implies a 1:1 join, i.e., the join does not change the result cardinality.
+ * 
+ * Joins that satisfy the above conditions may be introduced by other rules 
+ * which use surrogate optimizations. Such an optimization aims to reduce data copies and communication costs by
+ * using the primary keys as surrogates for the desired data items. Typically,
+ * such a surrogate-based plan introduces a top-level join to finally resolve
+ * the surrogates to the desired data items. 
+ * In case the upstream plan does not require the original data items at all, such a top-level join is unnecessary.
+ * The purpose of this rule is to remove such unnecessary joins.
+ */
+public class RemoveUnusedOneToOneEquiJoinRule implements IAlgebraicRewriteRule {
+
+    private final Set<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
+    private final List<DataSourceScanOperator> dataScans = new ArrayList<DataSourceScanOperator>();
+    private boolean hasRun = false;
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
+        hasRun = true;
+        if (removeUnusedJoin(opRef)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    private boolean removeUnusedJoin(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+        // Propagate used variables from parents downwards.
+        parentsUsedVars.addAll(usedVars);
+
+        int numInputs = op.getInputs().size();
+        for (int i = 0; i < numInputs; i++) {
+            Mutable<ILogicalOperator> childOpRef = op.getInputs().get(i);
+            int unusedJoinBranchIndex = removeJoinFromInputBranch(childOpRef);
+            if (unusedJoinBranchIndex >= 0) {
+                int usedBranchIndex = (unusedJoinBranchIndex == 0) ? 1 : 0;
+                // Remove join at input index i, by hooking up op's input i with 
+                // the join's branch at unusedJoinBranchIndex.
+                AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) childOpRef.getValue();
+                op.getInputs().set(i, joinOp.getInputs().get(usedBranchIndex));
+                modified = true;
+            }
+            // Descend into children.
+            if (removeUnusedJoin(childOpRef)) {
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    private int removeJoinFromInputBranch(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return -1;
+        }
+
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
+        // Make sure the join is an equi-join.
+        if (!isEquiJoin(joinOp.getCondition())) {
+            return -1;
+        }
+
+        int unusedJoinBranchIndex = -1;
+        for (int i = 0; i < joinOp.getInputs().size(); i++) {
+            liveVars.clear();
+            VariableUtilities.getLiveVariables(joinOp.getInputs().get(i).getValue(), liveVars);
+            liveVars.retainAll(parentsUsedVars);
+            if (liveVars.isEmpty()) {
+                // None of the live variables from this branch are used by its parents.
+                unusedJoinBranchIndex = i;
+                break;
+            }
+        }
+        if (unusedJoinBranchIndex < 0) {
+            // The variables from both branches are used in the upstream plan. We cannot remove this join.
+            return -1;
+        }
+
+        // Check whether one of the join branches is unused.
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(joinOp, usedVars);
+
+        // Check whether all used variables originate from primary keys of exactly the same dataset.
+        // Collect a list of datascans whose primary key variables are used in the join condition.
+        gatherProducingDataScans(opRef, usedVars, dataScans);
+
+        // Check that all datascans scan the same dataset, and that the join condition
+        // only used primary key variables of those datascans.
+        for (int i = 0; i < dataScans.size(); i++) {
+            if (i > 0) {
+                AqlDataSource prevAqlDataSource = (AqlDataSource) dataScans.get(i - 1).getDataSource();
+                AqlDataSource currAqlDataSource = (AqlDataSource) dataScans.get(i).getDataSource();
+                if (!prevAqlDataSource.getDataset().equals(currAqlDataSource.getDataset())) {
+                    return -1;
+                }
+            }
+            // Remove from the used variables all the primary key vars of this dataset.
+            fillPKVars(dataScans.get(i), pkVars);
+            usedVars.removeAll(pkVars);
+        }
+        if (!usedVars.isEmpty()) {
+            // The join condition also uses some other variables that are not primary
+            // keys from datasource scans of the same dataset.
+            return -1;
+        }
+        return unusedJoinBranchIndex;
+    }
+
+    private void gatherProducingDataScans(Mutable<ILogicalOperator> opRef, List<LogicalVariable> joinUsedVars,
+            List<DataSourceScanOperator> dataScans) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+            for (Mutable<ILogicalOperator> inputOp : op.getInputs()) {
+                gatherProducingDataScans(inputOp, joinUsedVars, dataScans);
+            }
+            return;
+        }
+        DataSourceScanOperator dataScan = (DataSourceScanOperator) op;
+        fillPKVars(dataScan, pkVars);
+        // Check if join uses all PK vars.
+        if (joinUsedVars.containsAll(pkVars)) {
+            dataScans.add(dataScan);
+        }
+    }
+
+    private void fillPKVars(DataSourceScanOperator dataScan, List<LogicalVariable> pkVars) {
+        pkVars.clear();
+        AqlDataSource aqlDataSource = (AqlDataSource) dataScan.getDataSource();
+        int numPKs = DatasetUtils.getPartitioningKeys(aqlDataSource.getDataset()).size();
+        pkVars.clear();
+        for (int i = 0; i < numPKs; i++) {
+            pkVars.add(dataScan.getVariables().get(i));
+        }
+    }
+
+    private boolean isEquiJoin(Mutable<ILogicalExpression> conditionExpr) {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) conditionExpr.getValue();
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+            if (funcIdent != AlgebricksBuiltinFunctions.AND && funcIdent != AlgebricksBuiltinFunctions.EQ) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
index dd91fc1..aa38ce9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IOptimizableFuncExpr.java
@@ -22,4 +22,5 @@
     
     public int findLogicalVar(LogicalVariable var);
     public int findFieldName(String fieldName);
+    public void substituteVar(LogicalVariable original, LogicalVariable substitution);
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 0ab749d..308fcc9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -1,8 +1,10 @@
 package edu.uci.ics.asterix.optimizer.rules.am;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -44,6 +46,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -402,21 +405,36 @@
             probeSubTree = leftSubTree;
         }
         IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
-
-        // Clone the original join condition because we may have to modify it (and we also need the original).
         InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
+
+        // Remember the original probe subtree, and its primary-key variables,
+        // so we can later retrieve the missing attributes via an equi join.
+        List<LogicalVariable> originalSubTreePKs = new ArrayList<LogicalVariable>();
+        probeSubTree.getPrimaryKeyVars(originalSubTreePKs);
+        
+        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
+        // to stitch together a top-level equi join.
+        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = copyAndReinitProbeSubTree(probeSubTree, join
+                .getCondition().getValue(), optFuncExpr, context);
+
+        // Remember the primary-keys of the new probe subtree for the top-level equi join.
+        List<LogicalVariable> surrogateSubTreePKs = new ArrayList<LogicalVariable>();
+        probeSubTree.getPrimaryKeyVars(surrogateSubTreePKs);
+        
+        // Remember original live variables from the index sub tree.
+        List<LogicalVariable> indexSubTreeLiveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(indexSubTree.root, indexSubTreeLiveVars);
+
+        // Clone the original join condition because we may have to modify it (and we also need the original).        
         ILogicalExpression joinCond = join.getCondition().getValue().cloneExpression();
-
-        // Remember original live variables to make sure our new index-based plan returns exactly those vars as well.
-        List<LogicalVariable> originalLiveVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(join, originalLiveVars);
-
         // Create "panic" (non indexed) nested-loop join path if necessary.
         Mutable<ILogicalOperator> panicJoinRef = null;
+        Map<LogicalVariable, LogicalVariable> panicVarMap = null;
         if (optFuncExpr.getFuncExpr().getFunctionIdentifier() == AsterixBuiltinFunctions.EDIT_DISTANCE_CHECK) {
             panicJoinRef = new MutableObject<ILogicalOperator>(joinRef.getValue());
+            panicVarMap = new HashMap<LogicalVariable, LogicalVariable>();
             Mutable<ILogicalOperator> newProbeRootRef = createPanicNestedLoopJoinPlan(panicJoinRef, indexSubTree,
-                    probeSubTree, optFuncExpr, chosenIndex, context);
+                    probeSubTree, optFuncExpr, chosenIndex, panicVarMap, context);
             probeSubTree.rootRef.setValue(newProbeRootRef.getValue());
             probeSubTree.root = newProbeRootRef.getValue();
         }
@@ -430,37 +448,129 @@
         topSelect.getInputs().add(indexSubTree.rootRef);
         topSelect.setExecutionMode(ExecutionMode.LOCAL);
         context.computeAndSetTypeEnvironmentForOperator(topSelect);
-        joinRef.setValue(topSelect);
+        ILogicalOperator topOp = topSelect;
 
         // Hook up the indexed-nested loop join path with the "panic" (non indexed) nested-loop join path by putting a union all on top.
         if (panicJoinRef != null) {
-            // Gather live variables from the index plan and the panic plan.
-            List<LogicalVariable> indexPlanLiveVars = originalLiveVars;
+            LogicalVariable inputSearchVar = getInputSearchVar(optFuncExpr, indexSubTree);
+            indexSubTreeLiveVars.addAll(surrogateSubTreePKs);
+            indexSubTreeLiveVars.add(inputSearchVar);
             List<LogicalVariable> panicPlanLiveVars = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(panicJoinRef.getValue(), panicPlanLiveVars);
-            if (indexPlanLiveVars.size() != panicPlanLiveVars.size()) {
-                throw new AlgebricksException("Unequal number of variables returned from index plan and panic plan.");
-            }
             // Create variable mapping for union all operator.
             List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
-            for (int i = 0; i < indexPlanLiveVars.size(); i++) {
-                varMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(indexPlanLiveVars.get(i),
-                        panicPlanLiveVars.get(i), indexPlanLiveVars.get(i)));
+            for (int i = 0; i < indexSubTreeLiveVars.size(); i++) {
+                LogicalVariable indexSubTreeVar = indexSubTreeLiveVars.get(i);
+                LogicalVariable panicPlanVar = panicVarMap.get(indexSubTreeVar);
+                if (panicPlanVar == null) {
+                    panicPlanVar = indexSubTreeVar;
+                }
+                varMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(indexSubTreeVar, panicPlanVar,
+                        indexSubTreeVar));
             }
             UnionAllOperator unionAllOp = new UnionAllOperator(varMap);
-            unionAllOp.getInputs().add(new MutableObject<ILogicalOperator>(joinRef.getValue()));
+            unionAllOp.getInputs().add(new MutableObject<ILogicalOperator>(topOp));
             unionAllOp.getInputs().add(panicJoinRef);
             unionAllOp.setExecutionMode(ExecutionMode.PARTITIONED);
             context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
-            joinRef.setValue(unionAllOp);
+            topOp = unionAllOp;
         }
+
+        // Place a top-level equi-join on top to retrieve the missing variables from the original probe subtree.
+        // The inner (build) branch of the join is the subtree with the data scan, since the result of the similarity join could potentially be big.
+        // This choice may not always be the most efficient, but it seems more robust than the alternative.
+        Mutable<ILogicalExpression> eqJoinConditionRef = createPrimaryKeysEqJoinCondition(originalSubTreePKs,
+                surrogateSubTreePKs);
+        InnerJoinOperator topEqJoin = new InnerJoinOperator(eqJoinConditionRef, originalProbeSubTreeRootRef,
+                new MutableObject<ILogicalOperator>(topOp));
+        topEqJoin.setExecutionMode(ExecutionMode.PARTITIONED);
+        joinRef.setValue(topEqJoin);
+        context.computeAndSetTypeEnvironmentForOperator(topEqJoin);
+
         return true;
     }
+    
+    /**
+     * Copies the probeSubTree (using new variables), and reinitializes the probeSubTree to it.
+     * Accordingly replaces the variables in the given joinCond, and the optFuncExpr.
+     * Returns a reference to the original plan root.
+     */
+    private Mutable<ILogicalOperator> copyAndReinitProbeSubTree(OptimizableOperatorSubTree probeSubTree,
+            ILogicalExpression joinCond, IOptimizableFuncExpr optFuncExpr, IOptimizationContext context)
+            throws AlgebricksException {
+        // Copy probe subtree, replacing their variables with new ones. We will use the original variables
+        // to stitch together a top-level equi join.
+        Counter counter = new Counter(context.getVarCounter());
+        LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
+        ILogicalOperator probeSubTreeCopy = deepCopyVisitor.deepCopy(probeSubTree.root, null);
+        Mutable<ILogicalOperator> probeSubTreeRootRef = new MutableObject<ILogicalOperator>(probeSubTreeCopy);
+        context.setVarCounter(counter.get());
+
+        // Remember the original probe subtree, and its primary-key variables,
+        // so we can later retrieve the missing attributes via an equi join.
+        Mutable<ILogicalOperator> originalProbeSubTreeRootRef = probeSubTree.rootRef;
+
+        // Replace the original probe subtree with its copy.
+        Dataset origDataset = probeSubTree.dataset;
+        ARecordType origRecordType = probeSubTree.recordType;
+        probeSubTree.initFromSubTree(probeSubTreeRootRef);
+        inferTypes(probeSubTreeRootRef.getValue(), context);
+        probeSubTree.dataset = origDataset;
+        probeSubTree.recordType = origRecordType;
+
+        // Copying the subtree caused all variables to be changed. However, we want to retain the original
+        // secondary search key variable through the secondary-to-primary index search plan.
+        // Here, we substitute the replacement variable for the original variable in the copied subtree (and vice versa for the original subtree).
+        Map<LogicalVariable, LogicalVariable> varMapping = deepCopyVisitor.getVariableMapping();
+        LogicalVariable secondarySearchKeyVar = null;
+        for (int i = 0; i < optFuncExpr.getNumLogicalVars(); i++) {
+            LogicalVariable optFuncVar = optFuncExpr.getLogicalVar(i);
+            LogicalVariable remappedVar = varMapping.get(optFuncVar);
+            if (remappedVar != null) {
+                VariableUtilities.substituteVariablesInDescendantsAndSelf(originalProbeSubTreeRootRef.getValue(),
+                        optFuncVar, remappedVar, context);
+                VariableUtilities.substituteVariablesInDescendantsAndSelf(probeSubTree.root, remappedVar, optFuncVar,
+                        context);
+                secondarySearchKeyVar = optFuncVar;
+            }
+        }
+
+        // Replace the variables in the join condition based on the mapping of variables in the copied probe subtree.
+        for (Map.Entry<LogicalVariable, LogicalVariable> varMapEntry : varMapping.entrySet()) {
+            // Ignore secondary search key var, since it should remain unchanged.
+            if (varMapEntry.getKey() == secondarySearchKeyVar) {
+                continue;
+            }
+            joinCond.substituteVar(varMapEntry.getKey(), varMapEntry.getValue());
+        }
+        return originalProbeSubTreeRootRef;
+    }
+    
+    private Mutable<ILogicalExpression> createPrimaryKeysEqJoinCondition(List<LogicalVariable> originalSubTreePKs,
+            List<LogicalVariable> surrogateSubTreePKs) {
+        List<Mutable<ILogicalExpression>> eqExprs = new ArrayList<Mutable<ILogicalExpression>>();
+        int numPKVars = originalSubTreePKs.size();
+        for (int i = 0; i < numPKVars; i++) {
+            List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(surrogateSubTreePKs.get(i))));
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(originalSubTreePKs.get(i))));
+            ILogicalExpression eqFunc = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.EQ), args);
+            eqExprs.add(new MutableObject<ILogicalExpression>(eqFunc));
+        }
+        if (eqExprs.size() == 1) {
+            return eqExprs.get(0);
+        } else {
+            ILogicalExpression andFunc = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.AND), eqExprs);
+            return new MutableObject<ILogicalExpression>(andFunc);
+        }
+    }
 
     private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree,
-            IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context)
-            throws AlgebricksException {
+            IOptimizableFuncExpr optFuncExpr, Index chosenIndex, Map<LogicalVariable, LogicalVariable> panicVarMap,
+            IOptimizationContext context) throws AlgebricksException {
         LogicalVariable inputSearchVar = getInputSearchVar(optFuncExpr, indexSubTree);
 
         // We split the plan into two "branches", and add selections on each side.
@@ -470,8 +580,8 @@
         context.computeAndSetTypeEnvironmentForOperator(replicateOp);
 
         // Create select ops for removing tuples that are filterable and not filterable, respectively.
-        IVariableTypeEnvironment topTypeEnv = context.getOutputTypeEnvironment(joinRef.getValue());
-        IAType inputSearchVarType = (IAType) topTypeEnv.getVarType(inputSearchVar);
+        IVariableTypeEnvironment probeTypeEnv = context.getOutputTypeEnvironment(probeSubTree.root);
+        IAType inputSearchVarType = (IAType) probeTypeEnv.getVarType(inputSearchVar);
         Mutable<ILogicalOperator> isFilterableSelectOpRef = new MutableObject<ILogicalOperator>();
         Mutable<ILogicalOperator> isNotFilterableSelectOpRef = new MutableObject<ILogicalOperator>();
         createIsFilterableSelectOps(replicateOp, inputSearchVar, inputSearchVarType, optFuncExpr, chosenIndex, context,
@@ -485,6 +595,8 @@
         LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
         ILogicalOperator scanSubTree = deepCopyVisitor.deepCopy(indexSubTree.root, null);
         context.setVarCounter(counter.get());
+        Map<LogicalVariable, LogicalVariable> copyVarMap = deepCopyVisitor.getVariableMapping();
+        panicVarMap.putAll(copyVarMap);
 
         List<LogicalVariable> copyLiveVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getLiveVariables(scanSubTree, copyLiveVars);
@@ -493,14 +605,8 @@
         // condition since we deep-copied one of the scanner subtrees which
         // changed variables. 
         InnerJoinOperator joinOp = (InnerJoinOperator) joinRef.getValue();
-        // Substitute vars in the join condition due to copying of the scanSubTree.
-        List<LogicalVariable> joinCondUsedVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(joinOp, joinCondUsedVars);
-        for (int i = 0; i < joinCondUsedVars.size(); i++) {
-            int ix = originalLiveVars.indexOf(joinCondUsedVars.get(i));
-            if (ix >= 0) {
-                joinOp.getCondition().getValue().substituteVar(originalLiveVars.get(ix), copyLiveVars.get(ix));
-            }
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : copyVarMap.entrySet()) {
+            joinOp.getCondition().getValue().substituteVar(entry.getKey(), entry.getValue());
         }
         joinOp.getInputs().clear();
         joinOp.getInputs().add(new MutableObject<ILogicalOperator>(scanSubTree));
@@ -816,4 +922,11 @@
             }
         }
     }
+    
+    private void inferTypes(ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
+        for(Mutable<ILogicalOperator> childOpRef : op.getInputs()) {
+            inferTypes(childOpRef.getValue(), context);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
index 13e515a..dbbe6d9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableFuncExpr.java
@@ -96,4 +96,16 @@
     public OptimizableOperatorSubTree getOperatorSubTree(int index) {
         return subTrees[index];
     }
+
+    @Override
+    public void substituteVar(LogicalVariable original, LogicalVariable substitution) {
+        if (logicalVars != null) {
+            for (int i = 0; i < logicalVars.length; i++) {
+                if (logicalVars[i] == original) {
+                    logicalVars[i] = substitution;
+                    break;
+                }
+            }
+        }
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index 3269be7..fd1b89d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -16,6 +17,7 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -124,4 +126,11 @@
         dataset = null;
         recordType = null;
     }
+    
+    public void getPrimaryKeyVars(List<LogicalVariable> target) {
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            target.add(dataSourceScan.getVariables().get(i));
+        }
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index d383955..976bd87 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -77,4 +77,22 @@
     public Object dropFromCache(MetadataCache cache) {
         return cache.dropDataset(this);
     }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Dataset)) {
+            return false;
+        }
+        Dataset otherDataset = (Dataset) other;
+        if (!otherDataset.dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!otherDataset.datasetName.equals(datasetName)) {
+            return false;
+        }
+        return true;
+    }
 }
\ No newline at end of file