Added a new rewrite rule that eliminates unnecessary joins introduced as part of surrogate-based optimizations used in, for example, index-based fuzzy joins.

@@ -41,6 +41,7 @@
 import edu.uci.ics.asterix.optimizer.rules.PushProperJoinThroughProduct;
 import edu.uci.ics.asterix.optimizer.rules.PushSimilarityFunctionsBelowJoin;
 import edu.uci.ics.asterix.optimizer.rules.RemoveRedundantListifyRule;
+import edu.uci.ics.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
 import edu.uci.ics.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import edu.uci.ics.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
 import edu.uci.ics.asterix.optimizer.rules.SimilarityCheckRule;
@@ -192,7 +193,8 @@
         accessMethod.add(new IntroduceSelectAccessMethodRule());
         accessMethod.add(new IntroduceJoinAccessMethodRule());
         accessMethod.add(new IntroduceSecondaryIndexInsertDeleteRule());
-        accessMethod.add(new PushSimilarityFunctionsBelowJoin());        
+        accessMethod.add(new RemoveUnusedOneToOneEquiJoinRule());
+        accessMethod.add(new PushSimilarityFunctionsBelowJoin());
         accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
         return accessMethod;
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.mutable.Mutable;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+ * Removes join operators for which all of the following conditions are true:
+ * 1. The live variables of one input branch of the join are not used in the upstream plan
+ * 2. The join is an inner equi join
+ * 3. The join condition only uses variables that correspond to primary keys of the same dataset    
+ * Notice that the last condition implies a 1:1 join, i.e., the join does not change the result cardinality.
+ * 
+ * Joins that satisfy the above conditions may be introduced by other rules 
+ * which use surrogate optimizations. Such an optimization aims to reduce data copies and communication costs by
+ * using the primary keys as surrogates for the desired data items. Typically,
+ * such a surrogate-based plan introduces a top-level join to finally resolve
+ * the surrogates to the desired data items. 
+ * In case the upstream plan does not require the original data items at all, such a top-level join is unnecessary.
+ * The purpose of this rule is to remove such unnecessary joins.
+ */
+public class RemoveUnusedOneToOneEquiJoinRule implements IAlgebraicRewriteRule {
+    private final Set<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
+    private final List<DataSourceScanOperator> dataScans = new ArrayList<DataSourceScanOperator>();
+    private boolean hasRun = false;
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
+        hasRun = true;
+        if (removeUnusedJoin(opRef)) {
+            return true;
+        }
+        return false;
+    }
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+    private boolean removeUnusedJoin(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+        // Propagate used variables from parents downwards.
+        parentsUsedVars.addAll(usedVars);
+        int numInputs = op.getInputs().size();
+        for (int i = 0; i < numInputs; i++) {
+            Mutable<ILogicalOperator> childOpRef = op.getInputs().get(i);
+            int unusedJoinBranchIndex = removeJoinFromInputBranch(childOpRef);
+            if (unusedJoinBranchIndex >= 0) {
+                int usedBranchIndex = (unusedJoinBranchIndex == 0) ? 1 : 0;
+                // Remove join at input index i, by hooking up op's input i with 
+                // the join's branch at unusedJoinBranchIndex.
+                AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) childOpRef.getValue();
+                op.getInputs().set(i, joinOp.getInputs().get(usedBranchIndex));
+                modified = true;
+            }
+            // Descend into children.
+            if (removeUnusedJoin(childOpRef)) {
+                modified = true;
+            }
+        }
+        return modified;
+    }
+    private int removeJoinFromInputBranch(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return -1;
+        }
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
+        // Make sure the join is an equi-join.
+        if (!isEquiJoin(joinOp.getCondition())) {
+            return -1;
+        }
+        int unusedJoinBranchIndex = -1;
+        for (int i = 0; i < joinOp.getInputs().size(); i++) {
+            liveVars.clear();
+            VariableUtilities.getLiveVariables(joinOp.getInputs().get(i).getValue(), liveVars);
+            liveVars.retainAll(parentsUsedVars);
+            if (liveVars.isEmpty()) {
+                // None of the live variables from this branch are used by its parents.
+                unusedJoinBranchIndex = i;
+                break;
+            }
+        }
+        if (unusedJoinBranchIndex < 0) {
+            // The variables from both branches are used in the upstream plan. We cannot remove this join.
+            return -1;
+        }
+        // Check whether one of the join branches is unused.
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(joinOp, usedVars);
+        // Check whether all used variables originate from primary keys of exactly the same dataset.
+        // Collect a list of datascans whose primary key variables are used in the join condition.
+        gatherProducingDataScans(opRef, usedVars, dataScans);
+        // Check that all datascans scan the same dataset, and that the join condition
+        // only used primary key variables of those datascans.
+        for (int i = 0; i < dataScans.size(); i++) {
+            if (i > 0) {
+                AqlDataSource prevAqlDataSource = (AqlDataSource) dataScans.get(i - 1).getDataSource();
+                AqlDataSource currAqlDataSource = (AqlDataSource) dataScans.get(i).getDataSource();
+                if (!prevAqlDataSource.getDataset().equals(currAqlDataSource.getDataset())) {
+                    return -1;
+                }
+            }
+            // Remove from the used variables all the primary key vars of this dataset.
+            fillPKVars(dataScans.get(i), pkVars);
+            usedVars.removeAll(pkVars);
+        }
+        if (!usedVars.isEmpty()) {
+            // The join condition also uses some other variables that are not primary
+            // keys from datasource scans of the same dataset.
+            return -1;
+        }
+        return unusedJoinBranchIndex;
+    }
+    private void gatherProducingDataScans(Mutable<ILogicalOperator> opRef, List<LogicalVariable> joinUsedVars,
+            List<DataSourceScanOperator> dataScans) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+            for (Mutable<ILogicalOperator> inputOp : op.getInputs()) {
+                gatherProducingDataScans(inputOp, joinUsedVars, dataScans);
+            }
+            return;
+        }
+        DataSourceScanOperator dataScan = (DataSourceScanOperator) op;
+        fillPKVars(dataScan, pkVars);
+        // Check if join uses all PK vars.
+        if (joinUsedVars.containsAll(pkVars)) {
+            dataScans.add(dataScan);
+        }
+    }
+    private void fillPKVars(DataSourceScanOperator dataScan, List<LogicalVariable> pkVars) {
+        pkVars.clear();
+        AqlDataSource aqlDataSource = (AqlDataSource) dataScan.getDataSource();
+        int numPKs = DatasetUtils.getPartitioningKeys(aqlDataSource.getDataset()).size();
+        pkVars.clear();
+        for (int i = 0; i < numPKs; i++) {
+            pkVars.add(dataScan.getVariables().get(i));
+        }
+    }
+    private boolean isEquiJoin(Mutable<ILogicalExpression> conditionExpr) {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) conditionExpr.getValue();
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+            if (funcIdent != AlgebricksBuiltinFunctions.AND && funcIdent != AlgebricksBuiltinFunctions.EQ) {
+                return false;
+            }
+        }
+        return true;
+    }
     public Object dropFromCache(MetadataCache cache) {
         return cache.dropDataset(this);
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Dataset)) {
+            return false;
+        }
+        Dataset otherDataset = (Dataset) other;
+        if (!otherDataset.dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!otherDataset.datasetName.equals(datasetName)) {
+            return false;
+        }
+        return true;
+    }
