Algebricks fix for issue 873.

Change-Id: I78a4a30638d6cc5681b5410046fff6345b515291
Reviewed-on: https://asterix-gerrit.ics.uci.edu/266
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wenhai Li <lwhaymail@yahoo.com>
Reviewed-by: Ildar Absalyamov <ildar.absalyamov@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/EquivalenceClass.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/EquivalenceClass.java
index e3eb8d0..ab95c9c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/EquivalenceClass.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/EquivalenceClass.java
@@ -14,39 +14,64 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.base;
 
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 
 public final class EquivalenceClass {
-    private List<LogicalVariable> members;
+    private Set<ILogicalExpression> expressionMembers = new ListSet<ILogicalExpression>();
+    private Set<LogicalVariable> members = new ListSet<LogicalVariable>();
     private ILogicalExpression constRepresentative;
     private LogicalVariable variableRepresentative;
     private boolean representativeIsConst;
 
-    public EquivalenceClass(List<LogicalVariable> members, ILogicalExpression constRepresentative) {
-        this.members = members;
+    public EquivalenceClass(Collection<LogicalVariable> members, ILogicalExpression constRepresentative) {
+        this.members.addAll(members);
         this.constRepresentative = constRepresentative;
         representativeIsConst = true;
     }
 
-    public EquivalenceClass(List<LogicalVariable> members, LogicalVariable variableRepresentative) {
-        this.members = members;
+    public EquivalenceClass(Collection<LogicalVariable> members, LogicalVariable variableRepresentative) {
+        this.members.addAll(members);
         this.variableRepresentative = variableRepresentative;
         representativeIsConst = false;
     }
 
+    public EquivalenceClass(Collection<LogicalVariable> members, ILogicalExpression constRepresentative,
+            Collection<ILogicalExpression> expressionMembers) {
+        this(members, constRepresentative);
+        this.expressionMembers.addAll(expressionMembers);
+    }
+
+    public EquivalenceClass(Collection<LogicalVariable> members, LogicalVariable variableRepresentative,
+            Collection<ILogicalExpression> expressionMembers) {
+        this(members, variableRepresentative);
+        this.expressionMembers.addAll(expressionMembers);
+    }
+
     public boolean representativeIsConst() {
         return representativeIsConst;
     }
 
-    public List<LogicalVariable> getMembers() {
+    public Collection<LogicalVariable> getMembers() {
         return members;
     }
 
+    public Collection<ILogicalExpression> getExpressionMembers() {
+        return expressionMembers;
+    }
+
     public boolean contains(LogicalVariable var) {
         return members.contains(var);
     }
 
+    public boolean contains(ILogicalExpression expr) {
+        return expressionMembers.contains(expr);
+    }
+
     public ILogicalExpression getConstRepresentative() {
         return constRepresentative;
     }
@@ -71,6 +96,7 @@
             representativeIsConst = true;
             constRepresentative = ec2.getConstRepresentative();
         }
+        expressionMembers.addAll(ec2.getExpressionMembers());
     }
 
     public void addMember(LogicalVariable v) {
@@ -91,7 +117,8 @@
 
     @Override
     public String toString() {
-        return "(<" + (representativeIsConst ? constRepresentative : variableRepresentative) + "> " + members + ")";
+        return "(<" + (representativeIsConst ? constRepresentative : variableRepresentative) + "> " + members + ";"
+                + expressionMembers + ")";
     }
 
     @Override
@@ -103,6 +130,9 @@
             if (!members.equals(ec.getMembers())) {
                 return false;
             }
+            if (!expressionMembers.equals(ec.getExpressionMembers())) {
+                return false;
+            }
             if (representativeIsConst) {
                 return ec.representativeIsConst() && (constRepresentative.equals(ec.getConstRepresentative()));
             } else {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index fcccee1..1d7f134 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -93,6 +94,12 @@
         ILogicalOperator inp1 = op.getInputs().get(0).getValue();
         Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
         ctx.putEquivalenceClassMap(op, eqClasses);
+
+        // Propagates equivalence classes that from expressions.
+        // Note that an equivalence class can also contain expression members.
+        propagateEquivalenceFromExpressionsToVariables(eqClasses, op.getExpressions(), op.getVariables());
+
+        // Generates FDs.
         List<LogicalVariable> used = new ArrayList<LogicalVariable>();
         VariableUtilities.getUsedVariables(op, used);
         List<FunctionalDependency> fds1 = getOrComputeFDs(inp1, ctx);
@@ -115,7 +122,9 @@
     @Override
     public Void visitDataScanOperator(DataSourceScanOperator op, IOptimizationContext ctx) throws AlgebricksException {
         ILogicalOperator inp1 = op.getInputs().get(0).getValue();
-        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
+        Map<LogicalVariable, EquivalenceClass> propagatedEqClasses = getOrComputeEqClasses(inp1, ctx);
+        eqClasses.putAll(propagatedEqClasses);
         ctx.putEquivalenceClassMap(op, eqClasses);
         List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>(getOrComputeFDs(inp1, ctx));
         ctx.putFDList(op, fds);
@@ -426,13 +435,14 @@
 
     @Override
     public Void visitSelectOperator(SelectOperator op, IOptimizationContext ctx) throws AlgebricksException {
-        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
-        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ILogicalOperator childOp = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = getOrComputeEqClasses(childOp, ctx);
         ctx.putEquivalenceClassMap(op, equivalenceClasses);
+
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
         ctx.putFDList(op, functionalDependencies);
-        ILogicalOperator op0 = op.getInputs().get(0).getValue();
-        functionalDependencies.addAll(getOrComputeFDs(op0, ctx));
-        equivalenceClasses.putAll(getOrComputeEqClasses(op0, ctx));
+        functionalDependencies.addAll(getOrComputeFDs(childOp, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(childOp, ctx));
         ILogicalExpression expr = op.getCondition().getValue();
         expr.getConstraintsAndEquivClasses(functionalDependencies, equivalenceClasses);
         return null;
@@ -496,20 +506,19 @@
 
     @Override
     public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx) throws AlgebricksException {
-        setEmptyFDsEqClasses(op, ctx);
+        propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
 
     @Override
     public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
-        setEmptyFDsEqClasses(op, ctx);
+        propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
 
     @Override
-    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx)
-            throws AlgebricksException {
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx) throws AlgebricksException {
         setEmptyFDsEqClasses(op, ctx);
         return null;
     }
@@ -538,6 +547,16 @@
         return eqClasses;
     }
 
+    private Map<LogicalVariable, EquivalenceClass> getOrCreateEqClasses(ILogicalOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> eqClasses = ctx.getEquivalenceClassMap(op);
+        if (eqClasses == null) {
+            eqClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+            ctx.putEquivalenceClassMap(op, eqClasses);
+        }
+        return eqClasses;
+    }
+
     private List<FunctionalDependency> getOrComputeFDs(ILogicalOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         List<FunctionalDependency> fds = ctx.getFDList(op);
@@ -548,15 +567,28 @@
         return fds;
     }
 
+    /***
+     * Propagated equivalent classes from the child to the current operator, based
+     * on the used variables of the current operator.
+     * 
+     * @param op
+     *            , the current operator
+     * @param ctx
+     *            , the optimization context which keeps track of all equivalent classes.
+     * @param usedVariables
+     *            , used variables.
+     * @throws AlgebricksException
+     */
     private void propagateFDsAndEquivClassesForUsedVars(ILogicalOperator op, IOptimizationContext ctx,
             List<LogicalVariable> usedVariables) throws AlgebricksException {
         ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        Map<LogicalVariable, EquivalenceClass> eqClasses = new HashMap<LogicalVariable, EquivalenceClass>();
-        ctx.putEquivalenceClassMap(op, eqClasses);
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
         List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>();
         ctx.putFDList(op, fds);
 
         Map<LogicalVariable, EquivalenceClass> chldClasses = getOrComputeEqClasses(op2, ctx);
+
+        // Propagate equivalent classes that contain the used variables.
         for (LogicalVariable v : usedVariables) {
             EquivalenceClass ec = eqClasses.get(v);
             if (ec == null) {
@@ -584,6 +616,31 @@
             }
         }
 
+        // Propagates equivalent classes that contain expressions that use the used variables.
+        // Note that for the case variable $v is not in the used variables but it is
+        // equivalent to field-access($t, i) and $t is a used variable, the equivalent
+        // class should still be propagated (kept).
+        Set<LogicalVariable> usedVarSet = new HashSet<LogicalVariable>(usedVariables);
+        for (Entry<LogicalVariable, EquivalenceClass> entry : chldClasses.entrySet()) {
+            EquivalenceClass ec = entry.getValue();
+            for (ILogicalExpression expr : ec.getExpressionMembers()) {
+                Set<LogicalVariable> exprUsedVars = new HashSet<LogicalVariable>();
+                expr.getUsedVariables(exprUsedVars);
+                exprUsedVars.retainAll(usedVarSet);
+                // Check if the expression member uses a used variable.
+                if (!exprUsedVars.isEmpty()) {
+                    for (LogicalVariable v : ec.getMembers()) {
+                        eqClasses.put(v, ec);
+                        // If variable members contain a used variable, the representative
+                        // variable should be a used variable.
+                        if (usedVarSet.contains(v)) {
+                            ec.setVariableRepresentative(v);
+                        }
+                    }
+                }
+            }
+        }
+
         List<FunctionalDependency> chldFds = getOrComputeFDs(op2, ctx);
         for (FunctionalDependency fd : chldFds) {
             if (!usedVariables.containsAll(fd.getHead())) {
@@ -605,7 +662,13 @@
     private void fdsEqClassesForAbstractUnnestOperator(AbstractUnnestOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         ILogicalOperator inp1 = op.getInputs().get(0).getValue();
-        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
+        Map<LogicalVariable, EquivalenceClass> propagatedEqClasses = getOrComputeEqClasses(inp1, ctx);
+        /**
+         * The original eq classes of unnest-map are only for produced variables, therefore
+         * eqClasses and propagatedEqClasses do not have overlaps.
+         */
+        eqClasses.putAll(propagatedEqClasses);
         ctx.putEquivalenceClassMap(op, eqClasses);
         List<FunctionalDependency> fds = getOrComputeFDs(inp1, ctx);
         ctx.putFDList(op, fds);
@@ -670,4 +733,31 @@
         return null;
     }
 
+    /**
+     * Propagate equivalences that carried in expressions to the variables that
+     * they are assigned to.
+     * 
+     * @param eqClasses
+     *            an equivalent class map
+     * @param assignExprs
+     *            expressions on the right-hand-side of assignments
+     * @param assignVars
+     *            variables on the left-hand-side of assignments
+     */
+    private void propagateEquivalenceFromExpressionsToVariables(Map<LogicalVariable, EquivalenceClass> eqClasses,
+            List<Mutable<ILogicalExpression>> assignExprs, List<LogicalVariable> assignVars) {
+        for (int assignVarIndex = 0; assignVarIndex < assignVars.size(); ++assignVarIndex) {
+            LogicalVariable var = assignVars.get(assignVarIndex);
+            ILogicalExpression expr = assignExprs.get(assignVarIndex).getValue();
+            for (Entry<LogicalVariable, EquivalenceClass> entry : eqClasses.entrySet()) {
+                EquivalenceClass eqc = entry.getValue();
+                // If the equivalence class contains the right-hand-side expression,
+                // the left-hand-side variable is added into the equivalence class.
+                if (eqc.contains(expr)) {
+                    eqc.addMember(var);
+                }
+            }
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 3b115eb..ebbca8b 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -37,7 +37,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 6d79e1f..a3f4e6f 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -208,12 +208,12 @@
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(writeBuffer, writer);
             }
-            writer.close();
             try {
                 cursor.close();
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
+            writer.close();
         } finally {
             indexHelper.close();
         }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index a1e7dfe..7bfb378 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -591,7 +591,7 @@
         String lastFileName = lastInvIndex.getBTree().getFileReference().getFile().getName();
 
         LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFileName, lastFileName);
-        ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ctx);
+        ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ictx);
         ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
                 relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
                 relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));