ASTERIXDB-1674: fix LoadRecordFieldsRule and RemoveRedundantVariablesRule.

Change-Id: I52b2e3afac5ea99c13723cfc707821f95220a54a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1251
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java
index d444aca..f7f2de6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.optimizer.base.AnalysisUtil;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -79,7 +81,7 @@
             AssignOperator a1 = (AssignOperator) op1;
             ILogicalExpression expr = getFirstExpr(a1);
             if (AnalysisUtil.isAccessToFieldRecord(expr)) {
-                boolean res = findAndEliminateRedundantFieldAccess(a1);
+                boolean res = findAndEliminateRedundantFieldAccess(a1, context);
                 context.addToDontApplySet(this, op1);
                 return res;
             }
@@ -184,8 +186,8 @@
             a2InptList.clear();
             a2InptList.add(topChild);
             // and link it as child in the op. tree
-            topOp.getInputs().set(0, new MutableObject<ILogicalOperator>(a2));
-            findAndEliminateRedundantFieldAccess(a2);
+            topOp.getInputs().set(0, new MutableObject<>(a2));
+            findAndEliminateRedundantFieldAccess(a2, context);
         } else { // e.g., a join
             LinkedList<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>();
             VariableUtilities.getUsedVariables(a2, usedInAccess);
@@ -231,9 +233,9 @@
             IOptimizationContext context) throws AlgebricksException {
         List<Mutable<ILogicalOperator>> tpInpList = toPush.getInputs();
         tpInpList.clear();
-        tpInpList.add(new MutableObject<ILogicalOperator>(toPushThroughChildRef.getValue()));
+        tpInpList.add(new MutableObject<>(toPushThroughChildRef.getValue()));
         toPushThroughChildRef.setValue(toPush);
-        findAndEliminateRedundantFieldAccess(toPush);
+        findAndEliminateRedundantFieldAccess(toPush, context);
     }
 
     /**
@@ -244,7 +246,8 @@
      * assign $x := Expr
      * assign $y := record-constructor { "field": Expr, ... }
      */
-    private static boolean findAndEliminateRedundantFieldAccess(AssignOperator assign) throws AlgebricksException {
+    private static boolean findAndEliminateRedundantFieldAccess(AssignOperator assign, IOptimizationContext context)
+            throws AlgebricksException {
         ILogicalExpression expr = getFirstExpr(assign);
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
         ILogicalExpression arg0 = f.getArguments().get(0).getValue();
@@ -257,15 +260,16 @@
         if (arg1.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
             return false;
         }
+        IVariableTypeEnvironment typeEnvironment = context.getOutputTypeEnvironment(assign);
         ConstantExpression ce = (ConstantExpression) arg1;
         ILogicalExpression fldExpr;
         if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
             String fldName = ((AString) ((AsterixConstantValue) ce.getValue()).getObject()).getStringValue();
-            fldExpr = findFieldExpression(assign, recordVar, fldName,
-                    LoadRecordFieldsRule::findFieldByNameFromRecordConstructor);
+            fldExpr = findFieldExpression(assign, recordVar, fldName, typeEnvironment,
+                    (name, expression, env) -> findFieldByNameFromRecordConstructor(name, expression));
         } else if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX)) {
             Integer fldIdx = ((AInt32) ((AsterixConstantValue) ce.getValue()).getObject()).getIntegerValue();
-            fldExpr = findFieldExpression(assign, recordVar, fldIdx,
+            fldExpr = findFieldExpression(assign, recordVar, fldIdx, typeEnvironment,
                     LoadRecordFieldsRule::findFieldByIndexFromRecordConstructor);
         } else if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED)) {
             return false;
@@ -292,12 +296,14 @@
 
     @FunctionalInterface
     private interface FieldResolver {
-        public ILogicalExpression resolve(Object accessKey, AbstractFunctionCallExpression funcExpr);
+        ILogicalExpression resolve(Object accessKey, AbstractFunctionCallExpression funcExpr,
+                IVariableTypeEnvironment typeEnvironment) throws AlgebricksException;
     }
 
     // Finds a field expression.
     private static ILogicalExpression findFieldExpression(AbstractLogicalOperator op, LogicalVariable recordVar,
-            Object accessKey, FieldResolver resolver) {
+            Object accessKey, IVariableTypeEnvironment typeEnvironment, FieldResolver resolver)
+            throws AlgebricksException {
         for (Mutable<ILogicalOperator> child : op.getInputs()) {
             AbstractLogicalOperator opChild = (AbstractLogicalOperator) child.getValue();
             if (opChild.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
@@ -306,18 +312,19 @@
                 if (i >= 0) {
                     AbstractLogicalExpression constr = (AbstractLogicalExpression) op2.getExpressions().get(i)
                             .getValue();
-                    return resolveFieldExpression(constr, accessKey, resolver);
+                    return resolveFieldExpression(constr, accessKey, typeEnvironment, resolver);
                 }
             } else if (opChild.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
                 NestedTupleSourceOperator nts = (NestedTupleSourceOperator) opChild;
                 AbstractLogicalOperator opBelowNestedPlan = (AbstractLogicalOperator) nts.getDataSourceReference()
                         .getValue().getInputs().get(0).getValue();
-                ILogicalExpression expr1 = findFieldExpression(opBelowNestedPlan, recordVar, accessKey, resolver);
+                ILogicalExpression expr1 = findFieldExpression(opBelowNestedPlan, recordVar, accessKey, typeEnvironment,
+                        resolver);
                 if (expr1 != null) {
                     return expr1;
                 }
             }
-            ILogicalExpression expr2 = findFieldExpression(opChild, recordVar, accessKey, resolver);
+            ILogicalExpression expr2 = findFieldExpression(opChild, recordVar, accessKey, typeEnvironment, resolver);
             if (expr2 != null) {
                 return expr2;
             }
@@ -327,7 +334,7 @@
 
     // Resolves field expression from an access key and a field resolver.
     private static ILogicalExpression resolveFieldExpression(AbstractLogicalExpression constr, Object accessKey,
-            FieldResolver resolver) {
+            IVariableTypeEnvironment typeEnvironment, FieldResolver resolver) throws AlgebricksException {
         if (constr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
             return null;
         }
@@ -336,7 +343,7 @@
                 && !fce.getFunctionIdentifier().equals(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR)) {
             return null;
         }
-        return resolver.resolve(accessKey, fce);
+        return resolver.resolve(accessKey, fce, typeEnvironment);
     }
 
     // Resolves field expression by name-based access.
@@ -355,9 +362,12 @@
 
     // Resolves field expression by index-based access.
     private static ILogicalExpression findFieldByIndexFromRecordConstructor(Object index,
-            AbstractFunctionCallExpression fce) {
+            AbstractFunctionCallExpression fce, IVariableTypeEnvironment typeEnvironment) throws AlgebricksException {
         Integer fieldIndex = (Integer) index;
-        return fce.getArguments().size() > fieldIndex ? fce.getArguments().get(2 * fieldIndex + 1).getValue() : null;
+        ARecordType recordType = (ARecordType) typeEnvironment.getType(fce);
+        String[] closedFieldNames = recordType.getFieldNames();
+        return closedFieldNames.length > fieldIndex
+                ? findFieldByNameFromRecordConstructor(closedFieldNames[fieldIndex], fce) : null;
     }
 
     private final class ExtractFieldLoadExpressionVisitor implements ILogicalExpressionReferenceTransform {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp
new file mode 100644
index 0000000..5f8e5a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type FacebookUserType as
+ open {
+  id : bigint
+}
+
+create type FacebookMessageType as
+ open {
+  `message-id` : bigint
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp
new file mode 100644
index 0000000..b49fa5b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp
new file mode 100644
index 0000000..dfa2098
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE TinySocial;
+
+WITH nested_msgs AS
+(
+  SELECT fu.name name,
+         (
+           SELECT fu.name name, fm.message msg
+           FROM FacebookMessages fm
+           WHERE fm.`author-id` = fu.id
+           ORDER BY fm.`message-id` DESC
+         ) AS msgs
+  FROM FacebookUsers fu
+)
+
+SELECT VALUE nm
+FROM nested_msgs nm
+WHERE (SOME msg IN nm.msgs SATISFIES msg.name LIKE '%Emory%')
+LIMIT 2;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp
new file mode 100644
index 0000000..5f8e5a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.ddl.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type FacebookUserType as
+ open {
+  id : bigint
+}
+
+create type FacebookMessageType as
+ open {
+  `message-id` : bigint
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp
new file mode 100644
index 0000000..b49fa5b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp
new file mode 100644
index 0000000..76f8501
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.3.query.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE TinySocial;
+
+WITH nested_msgs AS
+(
+  SELECT fu.id id, fu.name name,
+         (
+           SELECT fu.name name, fm.message msg
+           FROM FacebookMessages fm
+           WHERE fm.`author-id` = fu.id
+         ) AS msgs
+  FROM FacebookUsers fu
+)
+
+SELECT VALUE nm
+FROM nested_msgs nm
+WHERE NOT EXISTS (SELECT * FROM FacebookMessages fm WHERE fm.`author-id` = nm.id)
+ORDER BY nm.id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm
new file mode 100644
index 0000000..7c7ddd0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm
@@ -0,0 +1 @@
+{ "msgs": [ { "name": "EmoryUnk", "msg": " love verizon its wireless is good" }, { "name": "EmoryUnk", "msg": " love sprint its shortcut-menu is awesome:)" } ], "name": "EmoryUnk" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm
new file mode 100644
index 0000000..b8fbf3c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-1674/query-ASTERIXDB-1674.1.adm
@@ -0,0 +1,2 @@
+{ "id": 4, "msgs": [  ], "name": "NicholasStroh" }
+{ "id": 8, "msgs": [  ], "name": "NilaMilliron" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8615ed2..38ef4c2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4712,6 +4712,11 @@
         <output-dir compare="Text">query-ASTERIXDB-1005</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="quantifiers">
+      <compilation-unit name="query-ASTERIXDB-1674">
+        <output-dir compare="Text">query-ASTERIXDB-1674</output-dir>
+      </compilation-unit>
+    </test-case>
     <!--
         <test-case FilePath="quantifiers">
           <compilation-unit name="everysat_02">
@@ -5829,6 +5834,11 @@
         <output-dir compare="Text">query-ASTERIXDB-1597</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="subquery">
+      <compilation-unit name="query-ASTERIXDB-1674">
+        <output-dir compare="Text">query-ASTERIXDB-1674</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="subset-collection">
     <test-case FilePath="subset-collection">
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
index eb72db0..2f0913b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -25,7 +25,6 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -85,11 +84,6 @@
         return modified;
     }
 
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-
     private void updateEquivalenceClassMap(LogicalVariable lhs, LogicalVariable rhs) {
         List<LogicalVariable> equivalentVars = equivalentVarsMap.get(rhs);
         if (equivalentVars == null) {
@@ -105,10 +99,11 @@
     private boolean removeRedundantVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        LogicalOperatorTag opTag = op.getOperatorTag();
         boolean modified = false;
 
         // Update equivalence class map.
-        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+        if (opTag == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
             int numVars = assignOp.getVariables().size();
             for (int i = 0; i < numVars; i++) {
@@ -125,7 +120,7 @@
         }
 
         // Replace variable references with their first representative.
-        if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+        if (opTag == LogicalOperatorTag.PROJECT) {
             // The project operator does not use expressions, so we need to replace it's variables manually.
             if (replaceProjectVars((ProjectOperator) op)) {
                 modified = true;
@@ -154,7 +149,7 @@
         }
 
         // Deal with re-mapping of variables in group by.
-        if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+        if (opTag == LogicalOperatorTag.GROUP) {
             if (handleGroupByVarRemapping((GroupByOperator) op)) {
                 modified = true;
             }
@@ -164,6 +159,12 @@
             context.computeAndSetTypeEnvironmentForOperator(op);
             context.addToDontApplySet(this, op);
         }
+
+        // Clears the equivalent variable map if the current operator is the root operator
+        // in the query plan.
+        if (opTag == LogicalOperatorTag.DISTRIBUTE_RESULT || opTag == LogicalOperatorTag.SINK) {
+            equivalentVarsMap.clear();
+        }
         return modified;
     }