[NO ISSUE][COMP] Fix reference sharing in some optimizer rules

Details:
- Fixed optimizer rules that reused same operator/expression
  references or instances when creating new operators
- Fixed optimizer rules that reported that they did not make
  any plan changes when, in fact, they did

Change-Id: Ib9846f47339ea6e06fda17f4bac08a99ca5e8406
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7406
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 6c258e4..37b6ca7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -75,6 +75,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
@@ -350,31 +351,40 @@
 
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
-                            primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            tokenizeKeyVars, filterExpression, primaryIndexModificationOp.getOperation(),
-                            primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes);
+                            OperatorManipulationUtil
+                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
+                            secondaryExpressions, tokenizeKeyVars,
+                            filterExpression != null
+                                    ? new MutableObject<>(filterExpression.getValue().cloneExpression()) : null,
+                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            isPartitioned, varTypes);
                     tokenUpdate.setSourceLocation(sourceLoc);
                     tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
                     replicateOutput = tokenUpdate;
                     indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
-                            primaryIndexModificationOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
-                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
-                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
-                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
-                    indexUpdate.setSourceLocation(sourceLoc);
-                    indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
-                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
-                } else {
-                    // When TokenizeOperator is not needed
-                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
-                            primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            OperatorManipulationUtil
+                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
+                            tokenizeKeyExprs, filterExpression, primaryIndexModificationOp.getOperation(),
                             primaryIndexModificationOp.isBulkload(),
                             primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                     : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setSourceLocation(sourceLoc);
-                    indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
+                    indexUpdate.setAdditionalFilteringExpressions(
+                            OperatorManipulationUtil.cloneExpressions(filteringExpressions));
+                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
+                } else {
+                    // When TokenizeOperator is not needed
+                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
+                            OperatorManipulationUtil
+                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
+                            secondaryExpressions, filterExpression, primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(),
+                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
+                    indexUpdate.setSourceLocation(sourceLoc);
+                    indexUpdate.setAdditionalFilteringExpressions(
+                            OperatorManipulationUtil.cloneExpressions(filteringExpressions));
                     replicateOutput = indexUpdate;
                     // We add the necessary expressions for upsert
                     if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
@@ -478,12 +488,15 @@
                 }
                 DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp);
                 indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
-                        primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
-                        primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                        OperatorManipulationUtil
+                                .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
+                        secondaryExpressions, filterExpression, primaryIndexModificationOp.getOperation(),
+                        primaryIndexModificationOp.isBulkload(),
                         primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                 : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                 indexUpdate.setSourceLocation(sourceLoc);
-                indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
+                indexUpdate.setAdditionalFilteringExpressions(
+                        OperatorManipulationUtil.cloneExpressions(filteringExpressions));
                 if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                     // set before op secondary key expressions
                     if (filteringFields != null) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 164a505..87a2d03 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -792,6 +792,9 @@
             } else {
                 keyVar = ((VariableReferenceExpression) searchKeyExpr).getVariableReference();
                 if (constExpression != null) {
+                    if (constExpression.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                        constExpression = constExpression.cloneExpression();
+                    }
                     assignKeyExprList.add(new MutableObject<>(constExpression));
                     assignKeyVarList.add(constExprVars[i]);
                 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 5851467..e8ceba4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -413,22 +413,19 @@
                 assign.getInputs().add(new MutableObject<>(topOp));
             }
 
-            VariableReferenceExpression resVarRef2 = new VariableReferenceExpression(resVar);
-            resVarRef2.setSourceLocation(sourceLoc);
-            Mutable<ILogicalExpression> varRef = new MutableObject<>(resVarRef2);
             ILogicalOperator leafOperator;
             switch (stmt.getKind()) {
                 case INSERT:
-                    leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
+                    leafOperator = translateInsert(targetDatasource, resVar, varRefsForLoading,
                             additionalFilteringExpressions, assign, stmt, resultMetadata);
                     break;
                 case UPSERT:
-                    leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
+                    leafOperator = translateUpsert(targetDatasource, resVar, varRefsForLoading,
                             additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, topOp, exprs,
-                            resVar, additionalFilteringAssign, stmt, resultMetadata);
+                            additionalFilteringAssign, stmt, resultMetadata);
                     break;
                 case DELETE:
-                    leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
+                    leafOperator = translateDelete(targetDatasource, resVar, varRefsForLoading,
                             additionalFilteringExpressions, assign, stmt);
                     break;
                 default:
@@ -443,7 +440,7 @@
         return plan;
     }
 
-    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, LogicalVariable resVar,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt) throws AlgebricksException {
@@ -453,8 +450,10 @@
                     targetDatasource.getDataset().getDatasetName()
                             + ": delete from dataset is not supported on Datasets with Meta records");
         }
-        InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
+        VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
+        varRef.setSourceLocation(stmt.getSourceLocation());
+        InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource,
+                new MutableObject<>(varRef), varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
         deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         deleteOp.getInputs().add(new MutableObject<>(assign));
         deleteOp.setSourceLocation(sourceLoc);
@@ -464,11 +463,11 @@
         return leafOperator;
     }
 
-    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, LogicalVariable resVar,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
-            List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
+            List<Mutable<ILogicalExpression>> exprs, AssignOperator additionalFilteringAssign,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (!targetDatasource.getDataset().allow(topOp, DatasetUtil.OP_UPSERT)) {
@@ -521,8 +520,10 @@
                 }
             }
             // A change feed, we don't need the assign to access PKs
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList,
-                    InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
+            varRef.setSourceLocation(stmt.getSourceLocation());
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new MutableObject<>(varRef), varRefsForLoading,
+                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
             upsertOp.setUpsertIndicatorVar(context.newVar());
             upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the original record
@@ -554,7 +555,9 @@
             topOp.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
             upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         } else {
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+            VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
+            varRef.setSourceLocation(stmt.getSourceLocation());
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new MutableObject<>(varRef), varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
             upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
             upsertOp.getInputs().add(new MutableObject<>(assign));
@@ -579,7 +582,7 @@
         return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
     }
 
-    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, LogicalVariable resVar,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
@@ -590,8 +593,10 @@
                             + ": insert into dataset is not supported on Datasets with Meta records");
         }
         // Adds the insert operator.
-        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
+        VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
+        varRef.setSourceLocation(stmt.getSourceLocation());
+        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource,
+                new MutableObject<>(varRef), varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         insertOp.getInputs().add(new MutableObject<>(assign));
         insertOp.setSourceLocation(sourceLoc);
@@ -620,8 +625,8 @@
 
         //Create an assign operator that makes the variable used by the return expression
         LogicalVariable insertedVar = context.newVar();
-        AssignOperator insertedVarAssignOperator =
-                new AssignOperator(insertedVar, new MutableObject<>(insertOp.getPayloadExpression().getValue()));
+        AssignOperator insertedVarAssignOperator = new AssignOperator(insertedVar,
+                new MutableObject<>(insertOp.getPayloadExpression().getValue().cloneExpression()));
         insertedVarAssignOperator.getInputs().add(insertOp.getInputs().get(0));
         insertedVarAssignOperator.setSourceLocation(sourceLoc);
         insertOp.getInputs().set(0, new MutableObject<>(insertedVarAssignOperator));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 3f5012a..b8fe5c7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class OperatorManipulationUtil {
 
@@ -461,4 +462,15 @@
         }
         return -1;
     }
+
+    public static List<Mutable<ILogicalExpression>> createVariableReferences(List<LogicalVariable> varList,
+            SourceLocation sourceLoc) {
+        List<Mutable<ILogicalExpression>> varRefs = new ArrayList<>(varList.size());
+        for (LogicalVariable var : varList) {
+            VariableReferenceExpression varRef = new VariableReferenceExpression(var);
+            varRef.setSourceLocation(sourceLoc);
+            varRefs.add(new MutableObject<>(varRef));
+        }
+        return varRefs;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index 6c42929..b917ce1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -87,7 +87,7 @@
             if (!newGbyLiveVars.contains(usedVar)) {
                 // Let the left-hand side of gbyOp's decoration expressions populated through the combiner group-by without
                 // any intermediate assignment.
-                newGbyOp.addDecorExpression(null, p.second.getValue());
+                newGbyOp.addDecorExpression(null, p.second.getValue().cloneExpression());
                 newGbyLiveVars.add(usedVar);
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 706028b..9af21f5 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -698,9 +698,8 @@
         // these two exchange ops are needed so that the parents of replicate stay the same during later optimizations.
         // This is because replicate operator has references to its parents. If any later optimizations add new parents,
         // then replicate would still point to the old ones.
-        MutableObject<ILogicalOperator> replicateOpRef = new MutableObject<>(replicateOp);
-        ExchangeOperator exchToLocalAgg = createOneToOneExchangeOp(replicateOpRef, ctx);
-        ExchangeOperator exchToForward = createOneToOneExchangeOp(replicateOpRef, ctx);
+        ExchangeOperator exchToLocalAgg = createOneToOneExchangeOp(new MutableObject<>(replicateOp), ctx);
+        ExchangeOperator exchToForward = createOneToOneExchangeOp(new MutableObject<>(replicateOp), ctx);
         MutableObject<ILogicalOperator> exchToLocalAggRef = new MutableObject<>(exchToLocalAgg);
         MutableObject<ILogicalOperator> exchToForwardRef = new MutableObject<>(exchToForward);
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index b8dd24f..2cfa241 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -256,6 +256,7 @@
                     }
                 } else {
                     if (expr.isFunctional() && assignCommonExpression(exprEqClass, expr)) {
+                        modified = true;
                         //re-obtain the live vars after rewriting in the method called in the if condition
                         Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
                         VariableUtilities.getLiveVariables(op, liveVars);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 176ab7a..3effcc8 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -35,7 +35,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -48,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
@@ -172,7 +172,6 @@
             ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
             rop.setSourceLocation(candidateSourceLoc);
             rop.setPhysicalOperator(new ReplicatePOperator());
-            Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
             AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
             List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
 
@@ -194,14 +193,14 @@
                 AbstractLogicalOperator parent = (AbstractLogicalOperator) parentRef.getValue();
                 int index = parent.getInputs().indexOf(candidate);
                 if (parent.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                    parent.getInputs().set(index, ropRef);
+                    parent.getInputs().set(index, new MutableObject<>(rop));
                     rop.getOutputs().add(parentRef);
                 } else {
                     AbstractLogicalOperator exchange = new ExchangeOperator();
                     exchange.setPhysicalOperator(new OneToOneExchangePOperator());
                     exchange.setExecutionMode(rop.getExecutionMode());
                     MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
-                    exchange.getInputs().add(ropRef);
+                    exchange.getInputs().add(new MutableObject<>(rop));
                     rop.getOutputs().add(exchangeRef);
                     context.computeAndSetTypeEnvironmentForOperator(exchange);
                     parent.getInputs().set(index, exchangeRef);
@@ -210,12 +209,6 @@
             }
             List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
-            ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
-            for (LogicalVariable liveVar : liveVarsNew) {
-                VariableReferenceExpression liveVarRef = new VariableReferenceExpression(liveVar);
-                liveVarRef.setSourceLocation(candidateSourceLoc);
-                assignExprs.add(new MutableObject<ILogicalExpression>(liveVarRef));
-            }
             for (Mutable<ILogicalOperator> ref : group) {
                 if (ref.equals(candidate)) {
                     continue;
@@ -230,6 +223,8 @@
 
                 SourceLocation refSourceLoc = ref.getValue().getSourceLocation();
 
+                List<Mutable<ILogicalExpression>> assignExprs =
+                        OperatorManipulationUtil.createVariableReferences(liveVarsNew, candidateSourceLoc);
                 AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
                 assignOperator.setSourceLocation(refSourceLoc);
                 assignOperator.setExecutionMode(rop.getExecutionMode());
@@ -241,7 +236,7 @@
                 AbstractLogicalOperator exchOp = new ExchangeOperator();
                 exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
                 exchOp.setExecutionMode(rop.getExecutionMode());
-                exchOp.getInputs().add(ropRef);
+                exchOp.getInputs().add(new MutableObject<>(rop));
                 MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
                 rop.getOutputs().add(exchOpRef);
                 assignOperator.getInputs().add(exchOpRef);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index a724014..af67be2 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -177,6 +177,7 @@
             if (liveVars.size() == projectVarsTemp.size() && liveVars.containsAll(projectVarsTemp)) {
                 // The existing project has become useless. Remove it.
                 parentOp.getInputs().get(parentInputIndex).setValue(op.getInputs().get(0).getValue());
+                modified = true;
             }
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 8e41a15..6d5d67d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -204,7 +204,7 @@
                             newGbyNestedPlans.add(new ALogicalPlanImpl(rootOpRef));
 
                             upperSubplanRootRefIterator.remove();
-                            changed |= true;
+                            changed = true;
                             break;
                         }
                     }
@@ -212,10 +212,12 @@
 
                 if (upperSubplanRootRefs.isEmpty()) {
                     subplanNestedPlanIterator.remove();
+                    changed = true;
                 }
             }
             if (subplan.getNestedPlans().isEmpty()) {
                 subplanOperatorIterator.remove();
+                changed = true;
             }
         }
 
@@ -228,7 +230,7 @@
         parent.getInputs().get(0).setValue(gby);
 
         // Removes unnecessary pipelines inside the group by operator.
-        cleanup(currentRootRef.getValue(), gby);
+        changed |= cleanup(currentRootRef.getValue(), gby);
 
         // Computes type environments.
         context.computeAndSetTypeEnvironmentForOperator(gby);
@@ -245,7 +247,8 @@
      *            the group-by operator.
      * @throws AlgebricksException
      */
-    private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException {
+    private boolean cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException {
+        boolean changed = false;
         Set<LogicalVariable> freeVars = new HashSet<>();
         OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
         Iterator<ILogicalPlan> nestedPlanIterator = gby.getNestedPlans().iterator();
@@ -259,16 +262,20 @@
                     if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
                         aggOp.getVariables().remove(varIndex);
                         aggOp.getExpressions().remove(varIndex);
+                        changed = true;
                     }
                 }
                 if (aggOp.getVariables().isEmpty()) {
                     nestRootRefIterator.remove();
+                    changed = true;
                 }
             }
             if (nestedPlan.getRoots().isEmpty()) {
                 nestedPlanIterator.remove();
+                changed = true;
             }
         }
+        return changed;
     }
 
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {