[NO ISSUE][COMP] Push filters through UNION ALL

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Push assign and select operators through UnionAll operator
- Add testcases for pushing filters through UNION ALL
- Remove position writer from Unnest operator
- Add RemoveRedundantVariablesInUnionRule
- Refactor rules that push operators through UnionAll into
  a single extensible PushMapOperatorThroughUnionRule
- ExtractCommonOperatorsRule must compute schema for
  all operators it is creating
- Ensure that UnionAllOperator.recomputeSchema() always returns
  each output variable in the output schema
- OperatorSchemaImpl.addNewVariables() should handle duplicate
  entries of new variables that are being added
- Move isMap() from AbstractUnnestOperator to its subclasses
- Make ListSet.addAll() consistent with Set.addAll()

Change-Id: I0d6325a44b7051fe7fcd07ecc6e0311dc6c99938
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7963
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 5ebb5bc..57ae375 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.optimizer.rules.AsterixExtractFunctionsFromJoinConditionRule;
 import org.apache.asterix.optimizer.rules.AsterixInlineVariablesRule;
 import org.apache.asterix.optimizer.rules.AsterixIntroduceGroupByCombinerRule;
-import org.apache.asterix.optimizer.rules.AsterixPushAssignBelowUnionAllRule;
+import org.apache.asterix.optimizer.rules.AsterixPushMapOperatorThroughUnionRule;
 import org.apache.asterix.optimizer.rules.ByNameToByIndexFieldAccessRule;
 import org.apache.asterix.optimizer.rules.CancelUnnestSingletonListRule;
 import org.apache.asterix.optimizer.rules.CancelUnnestWithNestedListifyRule;
@@ -94,6 +94,7 @@
 import org.apache.asterix.optimizer.rules.subplan.AsterixMoveFreeVariableOperatorOutOfSubplanRule;
 import org.apache.asterix.optimizer.rules.subplan.InlineSubplanInputForNestedTupleSourceRule;
 import org.apache.asterix.optimizer.rules.temporal.TranslateIntervalExpressionRule;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.rewriter.rules.BreakSelectIntoConjunctsRule;
@@ -127,10 +128,10 @@
 import org.apache.hyracks.algebricks.rewriter.rules.PushSelectIntoJoinRule;
 import org.apache.hyracks.algebricks.rewriter.rules.PushSortDownRule;
 import org.apache.hyracks.algebricks.rewriter.rules.PushSubplanWithAggregateDownThroughProductRule;
-import org.apache.hyracks.algebricks.rewriter.rules.PushUnnestDownThroughUnionRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReinferAllTypesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveCartesianProductWithEmptyBranchRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVarsRule;
+import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantVariablesInUnionRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantVariablesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantWindowOperatorsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
@@ -230,6 +231,8 @@
         condPushDownAndJoinInference.add(new RemoveCartesianProductWithEmptyBranchRule());
         condPushDownAndJoinInference.add(new PushMapOperatorDownThroughProductRule());
         condPushDownAndJoinInference.add(new PushSubplanWithAggregateDownThroughProductRule());
+        condPushDownAndJoinInference
+                .add(new AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag.ASSIGN, LogicalOperatorTag.SELECT));
         condPushDownAndJoinInference.add(new SubplanOutOfGroupRule());
         // The following rule must run before PushAggregateIntoNestedSubplanRule
         // (before common subplans diverge due to aggregate pushdown)
@@ -291,8 +294,8 @@
         consolidation.add(new CountVarToCountOneRule());
         consolidation.add(new RemoveUnusedAssignAndAggregateRule());
         consolidation.add(new RemoveRedundantGroupByDecorVarsRule());
-        //PushUnnestDownUnion => RemoveRedundantListifyRule cause these rules are correlated
-        consolidation.add(new PushUnnestDownThroughUnionRule());
+        //PushUnnestThroughUnion => RemoveRedundantListifyRule cause these rules are correlated
+        consolidation.add(new AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag.UNNEST));
         consolidation.add(new RemoveRedundantListifyRule());
         // Window operator consolidation rules
         consolidation.add(new AsterixConsolidateWindowOperatorsRule());
@@ -318,9 +321,10 @@
     public static final List<IAlgebraicRewriteRule> buildPlanCleanupRuleCollection() {
         List<IAlgebraicRewriteRule> planCleanupRules = new LinkedList<>();
         planCleanupRules.add(new SwitchInnerJoinBranchRule());
-        planCleanupRules.add(new AsterixPushAssignBelowUnionAllRule());
+        planCleanupRules.add(new AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag.ASSIGN));
         planCleanupRules.add(new ExtractCommonExpressionsRule());
         planCleanupRules.add(new RemoveRedundantVariablesRule());
+        planCleanupRules.add(new RemoveRedundantVariablesInUnionRule()); // relies on RemoveUnusedAssignAndAggregateRule
         planCleanupRules.add(new PushProjectDownRule());
         planCleanupRules.add(new PushSelectDownRule());
         planCleanupRules.add(new SetClosedRecordConstructorsRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushAssignBelowUnionAllRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushAssignBelowUnionAllRule.java
deleted file mode 100644
index 23ff7c6..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushAssignBelowUnionAllRule.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.List;
-
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.rewriter.rules.PushAssignBelowUnionAllRule;
-
-public class AsterixPushAssignBelowUnionAllRule extends PushAssignBelowUnionAllRule {
-
-    // modifies field-access-by-index by adjusting the index to match the one in the branch where assign is moved to
-    @Override
-    protected boolean modifyExpression(ILogicalExpression expression, UnionAllOperator unionOp,
-            IOptimizationContext ctx, int inputIndex) throws AlgebricksException {
-        if (expression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
-            return true; // expressions other than functions need not be modified
-        }
-        AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) expression;
-        List<Mutable<ILogicalExpression>> arguments = functionCall.getArguments();
-        for (int k = 0, size = arguments.size(); k < size; k++) {
-            if (!modifyExpression(arguments.get(k).getValue(), unionOp, ctx, inputIndex)) {
-                return false;
-            }
-        }
-        // return true if any function other than field-access-by-index. otherwise, try to map the index.
-        return !functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FIELD_ACCESS_BY_INDEX)
-                || mapFieldIndex(functionCall, unionOp, ctx, inputIndex);
-    }
-
-    private static boolean mapFieldIndex(AbstractFunctionCallExpression functionCall, UnionAllOperator unionOp,
-            IOptimizationContext ctx, int inputIndex) throws AlgebricksException {
-        // the record variable in the field access should match the output variable from union, i.e. $2.getField
-        ILogicalExpression recordExpr = functionCall.getArguments().get(0).getValue();
-        if (recordExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-            return false;
-        }
-        Integer fieldIndex = ConstantExpressionUtil.getIntArgument(functionCall, 1);
-        if (fieldIndex == null) {
-            return false;
-        }
-        LogicalVariable recordVar = ((VariableReferenceExpression) recordExpr).getVariableReference();
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMap : unionOp.getVariableMappings()) {
-            if (varMap.third.equals(recordVar)) {
-                ARecordType unionType = (ARecordType) ctx.getOutputTypeEnvironment(unionOp).getVarType(varMap.third);
-                ILogicalOperator inputOpToUnion = unionOp.getInputs().get(inputIndex).getValue();
-                ARecordType inputType;
-                if (inputIndex == 0) {
-                    inputType = (ARecordType) ctx.getOutputTypeEnvironment(inputOpToUnion).getVarType(varMap.first);
-                } else {
-                    inputType = (ARecordType) ctx.getOutputTypeEnvironment(inputOpToUnion).getVarType(varMap.second);
-                }
-                String fieldName = unionType.getFieldNames()[fieldIndex];
-                fieldIndex = inputType.getFieldIndex(fieldName);
-                if (fieldIndex < 0) {
-                    return false;
-                }
-                functionCall.getArguments().get(1)
-                        .setValue(new ConstantExpression(new AsterixConstantValue(new AInt32(fieldIndex))));
-                return true;
-            }
-        }
-        return false;
-    }
-}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushMapOperatorThroughUnionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushMapOperatorThroughUnionRule.java
new file mode 100644
index 0000000..20ad536
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AsterixPushMapOperatorThroughUnionRule.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.rewriter.rules.PushMapOperatorThroughUnionRule;
+
+public class AsterixPushMapOperatorThroughUnionRule extends PushMapOperatorThroughUnionRule {
+
+    private final FieldAccessByIndexCollector fieldAccessByIndexCollector = new FieldAccessByIndexCollector();
+
+    private final FieldAccessByIndexTransformer fieldAccessByIndexTransformer = new FieldAccessByIndexTransformer();
+
+    private final Set<LogicalOperatorTag> allowedKinds;
+
+    public AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag... allowedKinds) {
+        if (allowedKinds.length == 0) {
+            throw new IllegalArgumentException();
+        }
+        this.allowedKinds = EnumSet.noneOf(LogicalOperatorTag.class);
+        Collections.addAll(this.allowedKinds, allowedKinds);
+    }
+
+    @Override
+    protected boolean isOperatorKindPushableThroughUnion(ILogicalOperator op) {
+        //TODO(dmitry): support subplan operator
+        return allowedKinds.contains(op.getOperatorTag()) && super.isOperatorKindPushableThroughUnion(op);
+    }
+
+    @Override
+    protected Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyForBranch(ILogicalOperator op,
+            Set<LogicalVariable> opUsedVars, UnionAllOperator unionAllOp, int branchIdx, IOptimizationContext context)
+            throws AlgebricksException {
+
+        if (((AbstractLogicalOperator) op).hasNestedPlans()) {
+            //TODO(dmitry): support subplan operator
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                    op.getOperatorTag().toString());
+        }
+
+        fieldAccessByIndexCollector.reset(unionAllOp, branchIdx, context);
+        op.acceptExpressionTransform(fieldAccessByIndexCollector);
+        if (fieldAccessByIndexCollector.failed) {
+            fieldAccessByIndexCollector.clear();
+            return null;
+        }
+
+        Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> newOpPair =
+                super.deepCopyForBranch(op, opUsedVars, unionAllOp, branchIdx, context);
+
+        if (fieldAccessByIndexCollector.hasFieldAccessMappings()) {
+            fieldAccessByIndexTransformer.reset(unionAllOp, branchIdx, context);
+            newOpPair.first.acceptExpressionTransform(fieldAccessByIndexTransformer);
+            fieldAccessByIndexTransformer.clear();
+        }
+
+        fieldAccessByIndexCollector.clear();
+        return newOpPair;
+    }
+
+    private static final class FieldAccessByIndexCollector extends AbstractFieldAccessByIndexTransformer {
+
+        private final Map<Pair<LogicalVariable, Integer>, Integer> fieldIndexMap = new HashMap<>();
+
+        private boolean failed;
+
+        @Override
+        void reset(UnionAllOperator unionAllOp, int branchIdx, IOptimizationContext context) {
+            super.reset(unionAllOp, branchIdx, context);
+            fieldIndexMap.clear();
+            failed = false;
+        }
+
+        @Override
+        void clear() {
+            super.clear();
+            fieldIndexMap.clear();
+        }
+
+        boolean hasFieldAccessMappings() {
+            return !fieldIndexMap.isEmpty();
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            visit(exprRef);
+            return false;
+        }
+
+        private void visit(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                return;
+            }
+            AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+            for (Mutable<ILogicalExpression> argExpr : callExpr.getArguments()) {
+                visit(argExpr);
+                if (failed) {
+                    return;
+                }
+            }
+
+            if (callExpr.getFunctionIdentifier().equals(BuiltinFunctions.FIELD_ACCESS_BY_INDEX)) {
+                boolean mapped = mapFieldIndex(callExpr);
+                if (!mapped) {
+                    failed = true;
+                }
+            }
+        }
+
+        private boolean mapFieldIndex(AbstractFunctionCallExpression callExpr) throws AlgebricksException {
+            // the record variable in the field access should match the output variable from union, i.e. $2.getField
+            ILogicalExpression recordExpr = callExpr.getArguments().get(0).getValue();
+            if (recordExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                return false;
+            }
+            Integer fieldIndexPostUnion = ConstantExpressionUtil.getIntArgument(callExpr, 1);
+            if (fieldIndexPostUnion == null) {
+                return false;
+            }
+            LogicalVariable recordVarPostUnion = ((VariableReferenceExpression) recordExpr).getVariableReference();
+            // 'recordVarPostUnion' is a post-union var, we need to find corresponding pre-union var
+            for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMap : unionAllOp.getVariableMappings()) {
+                if (varMap.third.equals(recordVarPostUnion)) {
+                    LogicalVariable recordVarPreUnion = branchIdx == 0 ? varMap.first : varMap.second;
+
+                    IVariableTypeEnvironment typeEnvPostUnion = context.getOutputTypeEnvironment(unionAllOp);
+                    ARecordType recordTypePostUnion = (ARecordType) typeEnvPostUnion.getVarType(recordVarPostUnion);
+                    String fieldName = recordTypePostUnion.getFieldNames()[fieldIndexPostUnion];
+
+                    ILogicalOperator inputOpToUnion = unionAllOp.getInputs().get(branchIdx).getValue();
+                    IVariableTypeEnvironment typeEnvPreUnion = context.getOutputTypeEnvironment(inputOpToUnion);
+                    ARecordType recordTypePreUnion = (ARecordType) typeEnvPreUnion.getVarType(recordVarPreUnion);
+
+                    int fieldIndexPreUnion = recordTypePreUnion.getFieldIndex(fieldName);
+                    if (fieldIndexPreUnion >= 0) {
+                        // we save 'recordVar' pre-union because super.deepCopyForBranch() will replace
+                        // post-union variables with pre-union variables in the operator's expressions
+                        fieldIndexMap.put(new Pair<>(recordVarPreUnion, fieldIndexPostUnion), fieldIndexPreUnion);
+                        return true;
+                    } else {
+                        return false;
+                    }
+                }
+
+            }
+            return false;
+        }
+
+    }
+
+    private final class FieldAccessByIndexTransformer extends AbstractFieldAccessByIndexTransformer {
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                return false;
+            }
+            boolean applied = false;
+            AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+            for (Mutable<ILogicalExpression> argument : callExpr.getArguments()) {
+                applied |= transform(argument);
+            }
+
+            if (callExpr.getFunctionIdentifier().equals(BuiltinFunctions.FIELD_ACCESS_BY_INDEX)) {
+                transformFieldIndex(callExpr);
+                applied = true;
+            }
+            return applied;
+        }
+
+        private void transformFieldIndex(AbstractFunctionCallExpression callExpr) throws AlgebricksException {
+            ILogicalExpression recordExpr = callExpr.getArguments().get(0).getValue();
+            if (recordExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, callExpr.getSourceLocation(),
+                        recordExpr.getExpressionTag().toString());
+            }
+            Integer fieldIndexPostUnion = ConstantExpressionUtil.getIntArgument(callExpr, 1);
+            if (fieldIndexPostUnion == null) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, callExpr.getSourceLocation(), "");
+            }
+            // this 'recordVar' is pre-union because super.deepCopyForBranch() replaced
+            // post-union variables with pre-union variables in the operator's expressions
+            LogicalVariable recordVarPreUnion = ((VariableReferenceExpression) recordExpr).getVariableReference();
+            Integer fieldIndexPreUnion =
+                    fieldAccessByIndexCollector.fieldIndexMap.get(new Pair<>(recordVarPreUnion, fieldIndexPostUnion));
+            if (fieldIndexPreUnion == null) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, callExpr.getSourceLocation(),
+                        recordVarPreUnion.toString());
+            }
+            callExpr.getArguments().get(1)
+                    .setValue(new ConstantExpression(new AsterixConstantValue(new AInt32(fieldIndexPreUnion))));
+        }
+    }
+
+    private abstract static class AbstractFieldAccessByIndexTransformer
+            implements ILogicalExpressionReferenceTransform {
+
+        protected UnionAllOperator unionAllOp;
+
+        protected int branchIdx;
+
+        protected IOptimizationContext context;
+
+        void reset(UnionAllOperator unionAllOp, int branchIdx, IOptimizationContext context) {
+            this.unionAllOp = unionAllOp;
+            this.branchIdx = branchIdx;
+            this.context = context;
+        }
+
+        void clear() {
+            unionAllOp = null;
+            context = null;
+        }
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 014ee40..4c227fa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -57,6 +57,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
 import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +147,8 @@
         jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
                 BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
                 BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
-                BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, null,
+                BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE,
+                UnnestingPositionWriterFactory.INSTANCE, null,
                 new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
                 ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
                 NoOpWarningCollector.INSTANCE, 0);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
index c0d6f82..c82029b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -89,8 +89,7 @@
         } else {
             LogicalVariable pVar = context.newVarFromExpression(fc.getPosVarExpr());
             // We set the positional variable type as INT64 type.
-            returnedOp = new UnnestOperator(v, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
-                    new PositionWriter());
+            returnedOp = new UnnestOperator(v, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64);
         }
         returnedOp.getInputs().add(pUnnestExpr.second);
         return new Pair<>(returnedOp, v);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 466a69c..c77774c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -320,8 +320,7 @@
         if (fromTerm.hasPositionalVariable()) {
             LogicalVariable pVar = context.newVarFromExpression(fromTerm.getPositionalVariable());
             // We set the positional variable type as BIGINT type.
-            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
-                    new PositionWriter());
+            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64);
         } else {
             unnestOp = new UnnestOperator(fromVar, new MutableObject<>(pUnnestExpr.first));
         }
@@ -537,10 +536,9 @@
             LogicalVariable pVar = context.newVarFromExpression(binaryCorrelate.getPositionalVariable());
             // We set the positional variable type as BIGINT type.
             unnestOp = innerUnnest
-                    ? new UnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
-                            new PositionWriter())
+                    ? new UnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64)
                     : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first), pVar,
-                            BuiltinType.AINT64, new PositionWriter());
+                            BuiltinType.AINT64);
         } else {
             unnestOp = innerUnnest ? new UnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first))
                     : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index a3dfdf4..a5daab6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -285,6 +285,7 @@
         builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
         builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
         builder.setMissingWriterFactory(format.getMissingWriterFactory());
+        builder.setUnnestingPositionWriterFactory(format.getUnnestingPositionWriterFactory());
         builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
         builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
         builder.setSerializerDeserializerProvider(format.getSerdeProvider());
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_1.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_1.sqlpp
new file mode 100644
index 0000000..532d646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_1.sqlpp
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type onekType1 as
+ closed {
+  unique1 : bigint,
+  unique2 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+-- switch unique1 and unique2 in the schema
+
+create type onekType2 as
+ closed {
+  unique2 : bigint,
+  unique1 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+create dataset onek1(onekType1) primary key unique2;
+
+create dataset onek2(onekType2) primary key unique2;
+
+create index onek1_idx on onek1(unique1);
+
+create index onek2_idx on onek2(unique1);
+
+with T as (
+  select value t1 from onek1 t1
+  union all
+  select value t2 from onek2 t2
+)
+select unique1, unique2, unique3 from T
+where unique1 >= 98
+order by unique2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_2.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_2.sqlpp
new file mode 100644
index 0000000..2554acb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_2.sqlpp
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type onekType1 as
+ closed {
+  unique1 : bigint,
+  unique2 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+-- switch unique1 and unique2 in the schema
+
+create type onekType2 as
+ closed {
+  unique2 : bigint,
+  unique1 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+create dataset onek1(onekType1) primary key unique2;
+
+create dataset onek2(onekType2) primary key unique2;
+
+create index onek1_idx on onek1(unique1);
+
+create index onek2_idx on onek2(unique1);
+
+with T as (
+  select unique1, unique2, unique3 from onek1
+  union all
+  select unique1, unique2, unique3 from onek2
+)
+select value t from T t
+where unique1 >= 98
+order by unique2;
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_3.sqlpp
similarity index 63%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_3.sqlpp
index 092d22e..73d6c95 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_3.sqlpp
@@ -16,11 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+drop  dataverse test if exists;
+create  dataverse test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+use test;
+
+create type fbuType as
+{
+  id : bigint
+};
+
+create dataset fbu1(fbuType) primary key id;
+
+create dataset fbu2(fbuType) primary key id;
+
+create index fbu1_idx on fbu1(alias: string);
+
+create index fbu2_idx on fbu2(alias: string);
+
+with T as (
+  select value t1 from fbu1 t1
+  union all
+  select value t2 from fbu2 t2
+)
+select alias, name from T
+where alias >= "Von"
+order by alias;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_4.sqlpp
similarity index 63%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_4.sqlpp
index 092d22e..260ef94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/union/union_opt_1_4.sqlpp
@@ -16,11 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+drop  dataverse test if exists;
+create  dataverse test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+use test;
+
+create type fbuType as
+{
+  id : bigint
+};
+
+create dataset fbu1(fbuType) primary key id;
+
+create dataset fbu2(fbuType) primary key id;
+
+create index fbu1_idx on fbu1(alias: string);
+
+create index fbu2_idx on fbu2(alias: string);
+
+with T as (
+  select alias, name from fbu1
+  union all
+  select alias, name from fbu2
+)
+select value t from T t
+where alias >= "Von"
+order by alias;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
index 92f10c3..07c478a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
@@ -1,94 +1,90 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$128(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$128(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$197(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$197(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
-                    -- STREAM_PROJECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- PRE_CLUSTERED_GROUP_BY[$$244]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- PRE_CLUSTERED_GROUP_BY[$$133]  |PARTITIONED|
-                                  {
-                                    -- AGGREGATE  |LOCAL|
-                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                  }
-                                  {
-                                    -- AGGREGATE  |LOCAL|
-                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                  }
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- STABLE_SORT [$$133(ASC)]  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$133]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$112]  |PARTITIONED|
-                                          {
-                                            -- AGGREGATE  |LOCAL|
-                                              -- STREAM_SELECT  |LOCAL|
-                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                          }
-                                          {
-                                            -- AGGREGATE  |LOCAL|
-                                              -- STREAM_SELECT  |LOCAL|
-                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                          }
+                          -- STABLE_SORT [$$244(ASC)]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$244]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$115]  |PARTITIONED|
+                                      {
+                                        -- AGGREGATE  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                      }
+                                      {
+                                        -- AGGREGATE  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                      }
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$115(ASC)]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$112(ASC)]  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- REPLICATE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- PRE_CLUSTERED_GROUP_BY[$$247]  |PARTITIONED|
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- PRE_CLUSTERED_GROUP_BY[$$136]  |PARTITIONED|
-                                    {
-                                      -- AGGREGATE  |LOCAL|
-                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                    }
-                                    {
-                                      -- AGGREGATE  |LOCAL|
-                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                    }
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- STABLE_SORT [$$136(ASC)]  |PARTITIONED|
-                                  -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
-                                    -- PRE_CLUSTERED_GROUP_BY[$$113]  |PARTITIONED|
-                                            {
-                                              -- AGGREGATE  |LOCAL|
-                                                -- STREAM_SELECT  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                            }
-                                            {
-                                              -- AGGREGATE  |LOCAL|
-                                                -- STREAM_SELECT  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                            }
+                            -- STABLE_SORT [$$247(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$247]  |PARTITIONED|
+                                -- PRE_CLUSTERED_GROUP_BY[$$116]  |PARTITIONED|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- STREAM_SELECT  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- STREAM_SELECT  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STABLE_SORT [$$113(ASC)]  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- REPLICATE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/common-expr-01.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/common-expr-01.plan
index 83fc968..c7b21c2 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/common-expr-01.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/common-expr-01.plan
@@ -3,13 +3,13 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- SORT_GROUP_BY[$$176, $$177]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$185, $$186]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$176, $$177]  |PARTITIONED|
-              -- SORT_GROUP_BY[$$159, $$160]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$185, $$186]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$168, $$169]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
                           -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -26,15 +26,15 @@
                                   -- STREAM_SELECT  |PARTITIONED|
                                     -- STREAM_PROJECT  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- PRE_CLUSTERED_GROUP_BY[$$163]  |PARTITIONED|
+                                        -- PRE_CLUSTERED_GROUP_BY[$$172]  |PARTITIONED|
                                                 {
                                                   -- AGGREGATE  |LOCAL|
                                                     -- STREAM_SELECT  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                 }
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STABLE_SORT [$$163(ASC)]  |PARTITIONED|
-                                              -- HASH_PARTITION_EXCHANGE [$$163]  |PARTITIONED|
+                                            -- STABLE_SORT [$$172(ASC)]  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                     -- NESTED_LOOP  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.1.plan
index 72d2bb1..aec6087 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.1.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.1.plan
@@ -1,8 +1,8 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$245(ASC), $$246(ASC), $$247(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$245(ASC), $$246(ASC), $$247(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$385(ASC), $$386(ASC), $$387(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$385(ASC), $$386(ASC), $$387(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -14,74 +14,14 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
                               -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- SORT_GROUP_BY[$$253]  |PARTITIONED|
-                                              {
-                                                -- AGGREGATE  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                              }
-                                        -- HASH_PARTITION_EXCHANGE [$$253]  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$232]  |PARTITIONED|
-                                                  {
-                                                    -- AGGREGATE  |LOCAL|
-                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                  }
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- REPLICATE  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- SORT_GROUP_BY[$$255]  |PARTITIONED|
-                                              {
-                                                -- AGGREGATE  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                              }
-                                        -- HASH_PARTITION_EXCHANGE [$$255]  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$233]  |PARTITIONED|
-                                                  {
-                                                    -- AGGREGATE  |LOCAL|
-                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                  }
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- REPLICATE  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- SORT_GROUP_BY[$$257, $$258]  |PARTITIONED|
+                                  -- SORT_GROUP_BY[$$1077]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
-                                    -- HASH_PARTITION_EXCHANGE [$$257, $$258]  |PARTITIONED|
-                                      -- SORT_GROUP_BY[$$234, $$235]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$1077]  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$238]  |PARTITIONED|
                                               {
                                                 -- AGGREGATE  |LOCAL|
                                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -99,33 +39,85 @@
                                                             -- DATASOURCE_SCAN  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STREAM_PROJECT  |PARTITIONED|
-                  -- ASSIGN  |PARTITIONED|
-                    -- STREAM_PROJECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- SORT_GROUP_BY[$$260]  |PARTITIONED|
-                                  {
-                                    -- AGGREGATE  |LOCAL|
-                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                  }
-                            -- HASH_PARTITION_EXCHANGE [$$260]  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$236]  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- SORT_GROUP_BY[$$1079]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- HASH_PARTITION_EXCHANGE [$$1079]  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$239]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$1081, $$1082]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$1081, $$1082]  |PARTITIONED|
+                                  -- SORT_GROUP_BY[$$240, $$241]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- REPLICATE  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$1084]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- HASH_PARTITION_EXCHANGE [$$1084]  |PARTITIONED|
+                          -- SORT_GROUP_BY[$$242]  |PARTITIONED|
+                                  {
+                                    -- AGGREGATE  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  }
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.2.plan
index bea5ef6..ed348a1 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/grouping-sets-1.2.plan
@@ -1,8 +1,8 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$759(ASC), $$760(ASC), $$761(ASC), $$762(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$759(ASC), $$760(ASC), $$761(ASC), $$762(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$1224(ASC), $$1225(ASC), $$1226(ASC), $$1227(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$1224(ASC), $$1225(ASC), $$1226(ASC), $$1227(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -12,7 +12,7 @@
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- UNION_ALL  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- UNION_ALL  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                 -- UNION_ALL  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -26,350 +26,339 @@
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                     -- UNION_ALL  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ASSIGN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- SORT_GROUP_BY[$$776, $$777, $$778, $$779]  |PARTITIONED|
-                                                                        {
-                                                                          -- AGGREGATE  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                        }
-                                                                  -- HASH_PARTITION_EXCHANGE [$$776, $$777, $$778, $$779]  |PARTITIONED|
-                                                                    -- SORT_GROUP_BY[$$710, $$711, $$712, $$713]  |PARTITIONED|
-                                                                            {
-                                                                              -- AGGREGATE  |LOCAL|
-                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                            }
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- REPLICATE  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- UNION_ALL  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- SORT_GROUP_BY[$$10189, $$10190, $$10191, $$10192]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
+                                                                    -- HASH_PARTITION_EXCHANGE [$$10189, $$10190, $$10191, $$10192]  |PARTITIONED|
+                                                                      -- SORT_GROUP_BY[$$717, $$718, $$719, $$720]  |PARTITIONED|
+                                                                              {
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                              }
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- REPLICATE  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- SORT_GROUP_BY[$$10194, $$10195, $$10196]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
+                                                                    -- HASH_PARTITION_EXCHANGE [$$10194, $$10195, $$10196]  |PARTITIONED|
+                                                                      -- SORT_GROUP_BY[$$721, $$722, $$723]  |PARTITIONED|
+                                                                              {
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                              }
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- SORT_GROUP_BY[$$10198, $$10199, $$10200]  |PARTITIONED|
+                                                                      {
+                                                                        -- AGGREGATE  |LOCAL|
+                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                      }
+                                                                -- HASH_PARTITION_EXCHANGE [$$10198, $$10199, $$10200]  |PARTITIONED|
+                                                                  -- SORT_GROUP_BY[$$724, $$725, $$726]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- SORT_GROUP_BY[$$10202, $$10203]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- HASH_PARTITION_EXCHANGE [$$10202, $$10203]  |PARTITIONED|
+                                                              -- SORT_GROUP_BY[$$727, $$728]  |PARTITIONED|
+                                                                      {
+                                                                        -- AGGREGATE  |LOCAL|
+                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                      }
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- REPLICATE  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- SORT_GROUP_BY[$$10205, $$10206, $$10207]  |PARTITIONED|
+                                                              {
+                                                                -- AGGREGATE  |LOCAL|
+                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                              }
+                                                        -- HASH_PARTITION_EXCHANGE [$$10205, $$10206, $$10207]  |PARTITIONED|
+                                                          -- SORT_GROUP_BY[$$729, $$730, $$731]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- REPLICATE  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$10209, $$10210]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- HASH_PARTITION_EXCHANGE [$$10209, $$10210]  |PARTITIONED|
+                                                      -- SORT_GROUP_BY[$$732, $$733]  |PARTITIONED|
+                                                              {
+                                                                -- AGGREGATE  |LOCAL|
+                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                              }
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- REPLICATE  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$10212, $$10213]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- HASH_PARTITION_EXCHANGE [$$10212, $$10213]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$734, $$735]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ASSIGN  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- SORT_GROUP_BY[$$781, $$782, $$783]  |PARTITIONED|
-                                                                        {
-                                                                          -- AGGREGATE  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                        }
-                                                                  -- HASH_PARTITION_EXCHANGE [$$781, $$782, $$783]  |PARTITIONED|
-                                                                    -- SORT_GROUP_BY[$$714, $$715, $$716]  |PARTITIONED|
-                                                                            {
-                                                                              -- AGGREGATE  |LOCAL|
-                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                            }
+                                                                -- REPLICATE  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$10215]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- HASH_PARTITION_EXCHANGE [$$10215]  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$736]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ASSIGN  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- SORT_GROUP_BY[$$785, $$786, $$787]  |PARTITIONED|
-                                                                    {
-                                                                      -- AGGREGATE  |LOCAL|
-                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                    }
-                                                              -- HASH_PARTITION_EXCHANGE [$$785, $$786, $$787]  |PARTITIONED|
-                                                                -- SORT_GROUP_BY[$$717, $$718, $$719]  |PARTITIONED|
-                                                                        {
-                                                                          -- AGGREGATE  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                        }
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- SORT_GROUP_BY[$$789, $$790]  |PARTITIONED|
-                                                                {
-                                                                  -- AGGREGATE  |LOCAL|
-                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                }
-                                                          -- HASH_PARTITION_EXCHANGE [$$789, $$790]  |PARTITIONED|
-                                                            -- SORT_GROUP_BY[$$720, $$721]  |PARTITIONED|
-                                                                    {
-                                                                      -- AGGREGATE  |LOCAL|
-                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                    }
+                                                            -- REPLICATE  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- REPLICATE  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- SORT_GROUP_BY[$$10217, $$10218]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- HASH_PARTITION_EXCHANGE [$$10217, $$10218]  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$737, $$738]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$792, $$793, $$794]  |PARTITIONED|
-                                                            {
-                                                              -- AGGREGATE  |LOCAL|
-                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                            }
-                                                      -- HASH_PARTITION_EXCHANGE [$$792, $$793, $$794]  |PARTITIONED|
-                                                        -- SORT_GROUP_BY[$$722, $$723, $$724]  |PARTITIONED|
-                                                                {
-                                                                  -- AGGREGATE  |LOCAL|
-                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                }
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- REPLICATE  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$10220]  |PARTITIONED|
+                                      {
+                                        -- AGGREGATE  |LOCAL|
+                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                      }
+                                -- HASH_PARTITION_EXCHANGE [$$10220]  |PARTITIONED|
+                                  -- SORT_GROUP_BY[$$739]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- SORT_GROUP_BY[$$796, $$797]  |PARTITIONED|
-                                                        {
-                                                          -- AGGREGATE  |LOCAL|
-                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                        }
-                                                  -- HASH_PARTITION_EXCHANGE [$$796, $$797]  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$725, $$726]  |PARTITIONED|
-                                                            {
-                                                              -- AGGREGATE  |LOCAL|
-                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                            }
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- REPLICATE  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- SORT_GROUP_BY[$$10222]  |PARTITIONED|
+                                  {
+                                    -- AGGREGATE  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  }
+                            -- HASH_PARTITION_EXCHANGE [$$10222]  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$740]  |PARTITIONED|
+                                      {
+                                        -- AGGREGATE  |LOCAL|
+                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                      }
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
                                     -- ASSIGN  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- SORT_GROUP_BY[$$799, $$800]  |PARTITIONED|
-                                                    {
-                                                      -- AGGREGATE  |LOCAL|
-                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                    }
-                                              -- HASH_PARTITION_EXCHANGE [$$799, $$800]  |PARTITIONED|
-                                                -- SORT_GROUP_BY[$$727, $$728]  |PARTITIONED|
-                                                        {
-                                                          -- AGGREGATE  |LOCAL|
-                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                        }
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ASSIGN  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- REPLICATE  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- SORT_GROUP_BY[$$802]  |PARTITIONED|
-                                                {
-                                                  -- AGGREGATE  |LOCAL|
-                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                }
-                                          -- HASH_PARTITION_EXCHANGE [$$802]  |PARTITIONED|
-                                            -- SORT_GROUP_BY[$$729]  |PARTITIONED|
-                                                    {
-                                                      -- AGGREGATE  |LOCAL|
-                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                    }
+                                            -- REPLICATE  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ASSIGN  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- REPLICATE  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- SORT_GROUP_BY[$$804, $$805]  |PARTITIONED|
-                                            {
-                                              -- AGGREGATE  |LOCAL|
-                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                            }
-                                      -- HASH_PARTITION_EXCHANGE [$$804, $$805]  |PARTITIONED|
-                                        -- SORT_GROUP_BY[$$730, $$731]  |PARTITIONED|
-                                                {
-                                                  -- AGGREGATE  |LOCAL|
-                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                }
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
-                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- REPLICATE  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$807]  |PARTITIONED|
-                                        {
-                                          -- AGGREGATE  |LOCAL|
-                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                        }
-                                  -- HASH_PARTITION_EXCHANGE [$$807]  |PARTITIONED|
-                                    -- SORT_GROUP_BY[$$732]  |PARTITIONED|
-                                            {
-                                              -- AGGREGATE  |LOCAL|
-                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                            }
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ASSIGN  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- REPLICATE  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- ASSIGN  |PARTITIONED|
-                      -- STREAM_PROJECT  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- SORT_GROUP_BY[$$809]  |PARTITIONED|
-                                    {
-                                      -- AGGREGATE  |LOCAL|
-                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                    }
-                              -- HASH_PARTITION_EXCHANGE [$$809]  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$733]  |PARTITIONED|
-                                        {
-                                          -- AGGREGATE  |LOCAL|
-                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                        }
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ASSIGN  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- REPLICATE  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- SORT_GROUP_BY[$$811]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- SORT_GROUP_BY[$$10224]  |PARTITIONED|
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
+                          -- HASH_PARTITION_EXCHANGE [$$10224]  |PARTITIONED|
+                            -- SORT_GROUP_BY[$$741]  |PARTITIONED|
                                     {
                                       -- AGGREGATE  |LOCAL|
                                         -- NESTED_TUPLE_SOURCE  |LOCAL|
                                     }
-                              -- HASH_PARTITION_EXCHANGE [$$811]  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$734]  |PARTITIONED|
-                                        {
-                                          -- AGGREGATE  |LOCAL|
-                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                        }
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
                                     -- STREAM_PROJECT  |PARTITIONED|
                                       -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- REPLICATE  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- REPLICATE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354.plan
index 0181a84..dbcdb5b 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354.plan
@@ -1,46 +1,45 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$74(ASC), $$75(ASC), $$76(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$74(ASC), $$75(ASC), $$76(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$114(ASC), $$115(ASC), $$116(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$114(ASC), $$115(ASC), $$116(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
-              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
-                  -- ASSIGN  |PARTITIONED|
+                  -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- ASSIGN  |PARTITIONED|
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- REPLICATE  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_SELECT  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- REPLICATE  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_SELECT  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
-                    -- STREAM_PROJECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- NESTED_LOOP  |PARTITIONED|
-                            -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- NESTED_LOOP  |PARTITIONED|
+                        -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- REPLICATE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_SELECT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- DATASOURCE_SCAN  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                              -- REPLICATE  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_SELECT  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354_ps.plan
index a17555e..6e96ce5 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2354_ps.plan
@@ -2,52 +2,51 @@
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$74(ASC), $$75(ASC), $$76(ASC)]  |PARTITIONED|
-          -- RANGE_PARTITION_EXCHANGE [$$74(ASC), $$75(ASC), $$76(ASC)]  |PARTITIONED|
+        -- STABLE_SORT [$$114(ASC), $$115(ASC), $$116(ASC)]  |PARTITIONED|
+          -- RANGE_PARTITION_EXCHANGE [$$114(ASC), $$115(ASC), $$116(ASC)]  |PARTITIONED|
             -- FORWARD  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- REPLICATE  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     -- UNION_ALL  |PARTITIONED|
-                      -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
+                          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
                               -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- REPLICATE  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_SELECT  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ASSIGN  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- NESTED_LOOP  |PARTITIONED|
-                                    -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- NESTED_LOOP  |PARTITIONED|
+                                -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                  -- REPLICATE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- STREAM_SELECT  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- DATASOURCE_SCAN  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- REPLICATE  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- BROADCAST_EXCHANGE  |PARTITIONED|
                 -- AGGREGATE  |UNPARTITIONED|
                   -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
@@ -57,42 +56,41 @@
                           -- REPLICATE  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- UNION_ALL  |PARTITIONED|
-                                -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
+                                    -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- REPLICATE  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- REPLICATE  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- NESTED_LOOP  |PARTITIONED|
-                                              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- NESTED_LOOP  |PARTITIONED|
+                                          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_SELECT  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- STREAM_SELECT  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                -- REPLICATE  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
index 80a0089..160ab94 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
@@ -16,7 +16,7 @@
                 -- STREAM_PROJECT  |LOCAL|
                   -- ASSIGN  |LOCAL|
                     -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                      -- PRE_CLUSTERED_GROUP_BY[$$159]  |LOCAL|
+                      -- PRE_CLUSTERED_GROUP_BY[$$166]  |LOCAL|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- AGGREGATE  |LOCAL|
@@ -26,9 +26,9 @@
                         -- ONE_TO_ONE_EXCHANGE  |LOCAL|
                           -- STREAM_PROJECT  |LOCAL|
                             -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                              -- HYBRID_HASH_JOIN [$$159][$$160]  |LOCAL|
+                              -- HYBRID_HASH_JOIN [$$166][$$167]  |LOCAL|
                                 -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$95]  |LOCAL|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$102]  |LOCAL|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- AGGREGATE  |LOCAL|
@@ -36,11 +36,11 @@
                                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                                      -- STABLE_SORT [$$95(ASC)]  |LOCAL|
+                                      -- STABLE_SORT [$$102(ASC)]  |LOCAL|
                                         -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
                                           -- STREAM_PROJECT  |UNPARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                              -- HYBRID_HASH_JOIN [$$95][$$152]  |UNPARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$102][$$159]  |UNPARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
                                                   -- STREAM_PROJECT  |UNPARTITIONED|
                                                     -- ASSIGN  |UNPARTITIONED|
@@ -123,7 +123,7 @@
                                           -- STREAM_SELECT  |LOCAL|
                                             -- ASSIGN  |LOCAL|
                                               -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                                                -- PRE_CLUSTERED_GROUP_BY[$$161]  |LOCAL|
+                                                -- PRE_CLUSTERED_GROUP_BY[$$168]  |LOCAL|
                                                         {
                                                           -- AGGREGATE  |LOCAL|
                                                             -- AGGREGATE  |LOCAL|
@@ -131,11 +131,11 @@
                                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                         }
                                                   -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-                                                    -- STABLE_SORT [$$161(ASC)]  |LOCAL|
+                                                    -- STABLE_SORT [$$168(ASC)]  |LOCAL|
                                                       -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
                                                         -- STREAM_PROJECT  |UNPARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                            -- HYBRID_HASH_JOIN [$$161][$$162]  |UNPARTITIONED|
+                                                            -- HYBRID_HASH_JOIN [$$168][$$169]  |UNPARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
                                                                 -- REPLICATE  |UNPARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset.plan
index fdc7d4c..5a955a0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset.plan
@@ -1,21 +1,19 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$11(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$11(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$21(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$21(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- DATASOURCE_SCAN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- DATASOURCE_SCAN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset3.plan
index 9d51d1c..0772bd8 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_dataset3.plan
@@ -1,21 +1,19 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$10(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$10(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$20(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$20(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- DATASOURCE_SCAN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- DATASOURCE_SCAN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_1.plan
new file mode 100644
index 0000000..7537523
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_1.plan
@@ -0,0 +1,47 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$122(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$122(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- UNION_ALL  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$165(ASC)]  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- REPLICATE  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$168(ASC)]  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_2.plan
new file mode 100644
index 0000000..08770db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_2.plan
@@ -0,0 +1,48 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$116(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- UNION_ALL  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$141(ASC)]  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- REPLICATE  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$144(ASC)]  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- BTREE_SEARCH  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_3.plan
new file mode 100644
index 0000000..6a422c4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_3.plan
@@ -0,0 +1,49 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$60(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$60(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- UNION_ALL  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- BTREE_SEARCH  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$163(ASC)]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- BTREE_SEARCH  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$166(ASC)]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_4.plan
new file mode 100644
index 0000000..85a6667
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_opt_1_4.plan
@@ -0,0 +1,50 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$112(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$112(ASC)]  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- UNION_ALL  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- BTREE_SEARCH  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$137(ASC)]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- BTREE_SEARCH  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$140(ASC)]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_query.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_query.plan
index 67de4c9..fd103f8 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_query.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/union/union_query.plan
@@ -1,9 +1,9 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
-  -- SORT_MERGE_EXCHANGE [$$16(ASC) ]  |PARTITIONED|
+  -- SORT_MERGE_EXCHANGE [$$res(ASC) ]  |PARTITIONED|
     -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$16]  |PARTITIONED|
+        -- STABLE_SORT [$$res(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$res]  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.1.ddl.sqlpp
new file mode 100644
index 0000000..1d7a8ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.1.ddl.sqlpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type onekType1 as
+ closed {
+  unique1 : bigint,
+  unique2 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+-- switch unique1 and unique2 in the schema
+
+create type onekType2 as
+ closed {
+  unique2 : bigint,
+  unique1 : bigint,
+  two : bigint,
+  four : bigint,
+  ten : bigint,
+  twenty : bigint,
+  onePercent : bigint,
+  tenPercent : bigint,
+  twentyPercent : bigint,
+  fiftyPercent : bigint,
+  unique3 : bigint,
+  evenOnePercent : bigint,
+  oddOnePercent : bigint,
+  stringu1 : string,
+  stringu2 : string,
+  string4 : string
+};
+
+create type fbuType as
+{
+  id : bigint
+};
+
+create dataset onek1(onekType1) primary key unique2;
+
+create dataset onek2(onekType2) primary key unique2;
+
+create dataset fbu1(fbuType) primary key id;
+
+create dataset fbu2(fbuType) primary key id;
+
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.2.update.sqlpp
similarity index 61%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.2.update.sqlpp
index 092d22e..19d5778 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.2.update.sqlpp
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+use test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+load dataset onek1 using localfs ((`path`=`asterix_nc1://data/wisc/onektup.adm`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
+load dataset onek2 using localfs ((`path`=`asterix_nc1://data/wisc/onektup.adm`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
+load dataset fbu1 using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load dataset fbu2 using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.3.ddl.sqlpp
similarity index 76%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.3.ddl.sqlpp
index 092d22e..c34933e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.3.ddl.sqlpp
@@ -16,11 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+use test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+create index onek1_idx on onek1(unique1);
+
+create index onek2_idx on onek2(unique1);
+
+create index fbu1_idx on fbu1(alias: string);
+
+create index fbu2_idx on fbu2(alias: string);
+
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.4.query.sqlpp
similarity index 76%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.4.query.sqlpp
index 092d22e..453e42c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.4.query.sqlpp
@@ -16,11 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+/**
+ * Test that the index is used (closed field)
+ */
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+use test;
+
+with T as (
+  select value t1 from onek1 t1
+  union all
+  select value t2 from onek2 t2
+)
+select unique1, unique2, unique3 from T
+where unique1 >= 98
+order by unique2;
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.5.query.sqlpp
similarity index 76%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.5.query.sqlpp
index 092d22e..d99241f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.5.query.sqlpp
@@ -16,11 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
+/**
+ * Test that the index is used (closed field)
+ */
 
-import java.io.DataOutput;
-import java.io.IOException;
+use test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+with T as (
+  select unique1, unique2, unique3 from onek1
+  union all
+  select unique1, unique2, unique3 from onek2
+)
+select value t from T t
+where unique1 >= 98
+order by unique2;
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.6.query.sqlpp
similarity index 76%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.6.query.sqlpp
index 092d22e..0f96940 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.6.query.sqlpp
@@ -16,11 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
+/**
+ * Test that the index is used (open field)
+ */
 
-import java.io.DataOutput;
-import java.io.IOException;
+use test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+with T as (
+  select value t1 from fbu1 t1
+  union all
+  select value t2 from fbu2 t2
+)
+select alias, name from T
+where alias >= "Von"
+order by alias;
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.7.query.sqlpp
similarity index 76%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.7.query.sqlpp
index 092d22e..872c963 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.7.query.sqlpp
@@ -16,11 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
+/**
+ * Test that the index is used (open field)
+ */
 
-import java.io.DataOutput;
-import java.io.IOException;
+use test;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+with T as (
+  select alias, name from fbu1
+  union all
+  select alias, name from fbu2
+)
+select value t from T t
+where alias >= "Von"
+order by alias;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.4.adm
new file mode 100644
index 0000000..638ab59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.4.adm
@@ -0,0 +1,4 @@
+{ "unique1": 98, "unique2": 0, "unique3": 0 }
+{ "unique1": 98, "unique2": 12, "unique3": 98 }
+{ "unique1": 99, "unique2": 25, "unique3": 25 }
+{ "unique1": 99, "unique2": 97, "unique3": 99 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.5.adm
new file mode 100644
index 0000000..638ab59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.5.adm
@@ -0,0 +1,4 @@
+{ "unique1": 98, "unique2": 0, "unique3": 0 }
+{ "unique1": 98, "unique2": 12, "unique3": 98 }
+{ "unique1": 99, "unique2": 25, "unique3": 25 }
+{ "unique1": 99, "unique2": 97, "unique3": 99 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.6.adm
new file mode 100644
index 0000000..355836b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.6.adm
@@ -0,0 +1,6 @@
+{ "alias": "Von", "name": "VonKemble" }
+{ "alias": "Von", "name": "VonKemble" }
+{ "alias": "Willis", "name": "WillisWynne" }
+{ "alias": "Willis", "name": "WillisWynne" }
+{ "alias": "Woodrow", "name": "WoodrowNehling" }
+{ "alias": "Woodrow", "name": "WoodrowNehling" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.7.adm
new file mode 100644
index 0000000..355836b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.7.adm
@@ -0,0 +1,6 @@
+{ "alias": "Von", "name": "VonKemble" }
+{ "alias": "Von", "name": "VonKemble" }
+{ "alias": "Willis", "name": "WillisWynne" }
+{ "alias": "Willis", "name": "WillisWynne" }
+{ "alias": "Woodrow", "name": "WoodrowNehling" }
+{ "alias": "Woodrow", "name": "WoodrowNehling" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 5eb0291..883a9e3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13463,6 +13463,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="union">
+      <compilation-unit name="union_opt_1">
+        <output-dir compare="Text">union_opt_1</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
       <compilation-unit name="union_orderby">
         <output-dir compare="Text">union_orderby</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
index 6906ae4..cab8c22 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
@@ -66,6 +67,8 @@
 
     public IMissingWriterFactory getMissingWriterFactory();
 
+    public IUnnestingPositionWriterFactory getUnnestingPositionWriterFactory();
+
     public Triple<IScalarEvaluatorFactory, ScalarFunctionCallExpression, IAType> partitioningEvaluatorFactory(
             IFunctionManager functionManager, ARecordType recType, List<String> fldName, SourceLocation sourceLoc)
             throws AlgebricksException;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index cf3ea0e..268c1f6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -1577,7 +1577,7 @@
     public static final FunctionIdentifier TREAT_AS_INTEGER =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "treat-as-integer", 1);
     public static final FunctionIdentifier IS_NUMERIC_ADD_COMPATIBLE =
-            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "is-numeric-add-compatibe", 1);
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "is-numeric-add-compatible", 1);
 
     public static final FunctionIdentifier EXTERNAL_LOOKUP =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "external-lookup", FunctionIdentifier.VARARGS);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriter.java
new file mode 100644
index 0000000..b8bfa7e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.base;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriter;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+final class UnnestingPositionWriter implements IUnnestingPositionWriter {
+
+    private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> aInt64Serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+    @Override
+    public void write(DataOutput dataOutput, long position) throws IOException {
+        aInt64.setValue(position);
+        aInt64Serde.serialize(aInt64, dataOutput);
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/PositionWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriterFactory.java
similarity index 61%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/PositionWriter.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriterFactory.java
index 6f8b64f..a662ac0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/PositionWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/UnnestingPositionWriterFactory.java
@@ -16,22 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.translator;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
+package org.apache.asterix.runtime.base;
 
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriter;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 
-public class PositionWriter implements IUnnestingPositionWriter, Serializable {
+public final class UnnestingPositionWriterFactory implements IUnnestingPositionWriterFactory {
+
     private static final long serialVersionUID = 1L;
 
-    @Override
-    public void write(DataOutput dataOutput, long position) throws IOException {
-        dataOutput.writeByte(BuiltinType.AINT64.getTypeTag().serialize());
-        dataOutput.writeLong(position);
+    public static final UnnestingPositionWriterFactory INSTANCE = new UnnestingPositionWriterFactory();
+
+    private UnnestingPositionWriterFactory() {
     }
 
+    @Override
+    public IUnnestingPositionWriter createUnnestingPositionWriter() {
+        return new UnnestingPositionWriter();
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
index 1fc200c..31a93dc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.om.functions.IFunctionManager;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
 import org.apache.asterix.runtime.evaluators.common.CreateMBREvalFactory;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -73,6 +74,7 @@
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
@@ -341,6 +343,11 @@
     }
 
     @Override
+    public IUnnestingPositionWriterFactory getUnnestingPositionWriterFactory() {
+        return UnnestingPositionWriterFactory.INSTANCE;
+    }
+
+    @Override
     public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
         return new IExpressionEvalSizeComputer() {
             @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/ListSet.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/ListSet.java
index 45cd32a..3f2170a 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/ListSet.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/ListSet.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 public class ListSet<E> implements Set<E> {
-    private List<E> list = new ArrayList<E>();
+    private final List<E> list = new ArrayList<E>();
 
     public ListSet() {
     }
@@ -36,17 +36,19 @@
 
     @Override
     public boolean add(E arg0) {
-        if (list.contains(arg0))
+        if (list.contains(arg0)) {
             return false;
+        }
         return list.add(arg0);
     }
 
     @Override
     public boolean addAll(Collection<? extends E> arg0) {
-        for (E item : arg0)
-            if (list.contains(item))
-                return false;
-        return list.addAll(arg0);
+        boolean modified = false;
+        for (E item : arg0) {
+            modified |= add(item);
+        }
+        return modified;
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index dbc859c..3f03cc8 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -65,6 +66,7 @@
     protected IConflictingTypeResolver conflictingTypeResolver;
     protected IExpressionEvalSizeComputer expressionEvalSizeComputer;
     protected IMissingWriterFactory missingWriterFactory;
+    protected IUnnestingPositionWriterFactory unnestingPositionWriterFactory;
     protected INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
     protected IPartialAggregationTypeComputer partialAggregationTypeComputer;
     protected IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
@@ -187,6 +189,14 @@
         return missingWriterFactory;
     }
 
+    public void setUnnestingPositionWriterFactory(IUnnestingPositionWriterFactory unnestingPositionWriterFactory) {
+        this.unnestingPositionWriterFactory = unnestingPositionWriterFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getUnnestingPositionWriterFactory() {
+        return unnestingPositionWriterFactory;
+    }
+
     public void setExpressionEvalSizeComputer(IExpressionEvalSizeComputer expressionEvalSizeComputer) {
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 0fd107f..49be31c 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -102,10 +102,11 @@
                                 serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
                                 comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
                                 binaryIntegerInspectorFactory, printerProvider, missingWriterFactory,
-                                normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer,
-                                oc, expressionEvalSizeComputer, partialAggregationTypeComputer,
-                                predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(),
-                                clusterLocations, warningCollector, maxWarnings);
+                                unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider,
+                                expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer,
+                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+                                physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector,
+                                maxWarnings);
 
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, jobEventListenerFactory);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index f0972b7..5f61c22 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -103,6 +103,10 @@
       <artifactId>jackson-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
index c2d2cfa..5fc69b2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
@@ -66,6 +66,11 @@
         return variableTypes;
     }
 
+    @Override
+    public boolean isMap() {
+        return true;
+    }
+
     /**
      * If propagateInput is true, then propagates the input variables.
      */
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
index 1659c46..5c485d5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
@@ -23,22 +23,15 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
 
 public abstract class AbstractUnnestNonMapOperator extends AbstractUnnestOperator {
 
     protected LogicalVariable positionalVariable;
 
     /**
-     * Specify the writer of the positional variable
-     */
-    protected IUnnestingPositionWriter positionWriter;
-
-    /**
      * Specify the type of the positional variable
      */
     protected Object positionalVariableType;
@@ -48,12 +41,10 @@
     }
 
     public AbstractUnnestNonMapOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
-            LogicalVariable positionalVariable, Object positionalVariableType,
-            IUnnestingPositionWriter positionWriter) {
+            LogicalVariable positionalVariable, Object positionalVariableType) {
         this(variable, expression);
         this.setPositionalVariable(positionalVariable);
         this.setPositionalVariableType(positionalVariableType);
-        this.setPositionWriter(positionWriter);
     }
 
     public LogicalVariable getVariable() {
@@ -68,12 +59,8 @@
         return positionalVariable;
     }
 
-    public void setPositionWriter(IUnnestingPositionWriter positionWriter) {
-        this.positionWriter = positionWriter;
-    }
-
-    public IUnnestingPositionWriter getPositionWriter() {
-        return positionalVariable != null ? positionWriter : null;
+    public boolean hasPositionalVariable() {
+        return positionalVariable != null;
     }
 
     public void setPositionalVariableType(Object positionalVariableType) {
@@ -84,6 +71,13 @@
         return positionalVariableType;
     }
 
+    @Override
+    public boolean isMap() {
+        //TODO(dmitry): unnest with positional variable is not a 'map'
+        //need to return !hasPositionalVariable();
+        return true;
+    }
+
     protected static <E> List<E> makeSingletonList(E item) {
         List<E> array = new ArrayList<>(1);
         array.add(item);
@@ -95,8 +89,7 @@
         return new VariablePropagationPolicy() {
 
             @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
                 if (sources.length > 0) {
                     target.addAllVariables(sources[0]);
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestOperator.java
index 1b07e2f..327d797 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestOperator.java
@@ -40,11 +40,6 @@
     }
 
     @Override
-    public boolean isMap() {
-        return true;
-    }
-
-    @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
         return visitor.transform(expression);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
index bdc3227..8c95a3f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
 
 public class LeftOuterUnnestOperator extends AbstractUnnestNonMapOperator {
 
@@ -35,9 +34,8 @@
     }
 
     public LeftOuterUnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
-            LogicalVariable positionalVariable, Object positionalVariableType,
-            IUnnestingPositionWriter positionWriter) {
-        super(variable, expression, positionalVariable, positionalVariableType, positionWriter);
+            LogicalVariable positionalVariable, Object positionalVariableType) {
+        super(variable, expression, positionalVariable, positionalVariableType);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
index ba5ef5a..45c6e2f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
@@ -35,7 +35,7 @@
 public class UnionAllOperator extends AbstractLogicalOperator {
 
     // (left-var, right-var, out-var)
-    private List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap;
+    private final List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap;
 
     public UnionAllOperator(List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap) {
         this.varMap = varMap;
@@ -112,31 +112,21 @@
             inputSchemaIdx = 1;
         }
 
-        schema = new ArrayList<>(inputSchema.size());
-        for (LogicalVariable inVar : inputSchema) {
-            LogicalVariable outVar = findOutputVar(inVar, inputSchemaIdx);
-            schema.add(outVar != null ? outVar : inVar);
-        }
-    }
-
-    private LogicalVariable findOutputVar(LogicalVariable inputVar, int inputIdx) {
+        schema = new ArrayList<>(inputSchema);
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
-            LogicalVariable testVar;
-            switch (inputIdx) {
-                case 0:
-                    testVar = t.first;
-                    break;
-                case 1:
-                    testVar = t.second;
-                    break;
-                default:
-                    throw new IllegalArgumentException(String.valueOf(inputIdx));
+            LogicalVariable inVar = inputSchemaIdx == 0 ? t.first : t.second;
+            LogicalVariable outVar = t.third;
+            boolean mappingFound = false;
+            for (int i = 0, n = schema.size(); i < n; i++) {
+                if (schema.get(i).equals(inVar)) {
+                    schema.set(i, outVar);
+                    mappingFound = true;
+                }
             }
-            if (inputVar.equals(testVar)) {
-                return t.third;
+            if (!mappingFound) {
+                schema.add(outVar);
             }
         }
-        return null;
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index e90298d..face56e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
 
 public class UnnestOperator extends AbstractUnnestNonMapOperator {
 
@@ -35,9 +34,8 @@
     }
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
-            LogicalVariable positionalVariable, Object positionalVariableType,
-            IUnnestingPositionWriter positionWriter) {
-        super(variable, expression, positionalVariable, positionalVariableType, positionWriter);
+            LogicalVariable positionalVariable, Object positionalVariableType) {
+        super(variable, expression, positionalVariable, positionalVariableType);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 1c02a6c..2cdcaa2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -22,6 +22,7 @@
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -491,29 +492,18 @@
 
     @Override
     public ILogicalOperator visitUnionOperator(UnionAllOperator op, ILogicalOperator arg) throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>();
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            copiedInputs.add(deepCopyOperatorReference(childRef, null));
-        }
-        List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>();
-        for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) {
-            List<LogicalVariable> liveVars = new ArrayList<>();
-            VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
-            liveVarsInInputs.add(liveVars);
-        }
-        List<LogicalVariable> liveVarsInLeftInput = liveVarsInInputs.get(0);
-        List<LogicalVariable> liveVarsInRightInput = liveVarsInInputs.get(1);
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> copiedTriples = new ArrayList<>();
-        int index = 0;
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> variableMappingsCopy =
+                new ArrayList<>(op.getVariableMappings().size());
+        UnionAllOperator opCopy = new UnionAllOperator(variableMappingsCopy);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : op.getVariableMappings()) {
             LogicalVariable producedVar = deepCopyVariable(triple.third);
+            LogicalVariable newLeftVar = Objects.requireNonNull(inputVarToOutputVarMapping.get(triple.first));
+            LogicalVariable newRightVar = Objects.requireNonNull(inputVarToOutputVarMapping.get(triple.second));
             Triple<LogicalVariable, LogicalVariable, LogicalVariable> copiedTriple =
-                    new Triple<>(liveVarsInLeftInput.get(index), liveVarsInRightInput.get(index), producedVar);
-            copiedTriples.add(copiedTriple);
-            ++index;
+                    new Triple<>(newLeftVar, newRightVar, producedVar);
+            variableMappingsCopy.add(copiedTriple);
         }
-        UnionAllOperator opCopy = new UnionAllOperator(copiedTriples);
-        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
 
@@ -565,7 +555,7 @@
     public ILogicalOperator visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
         UnnestOperator opCopy = new UnnestOperator(deepCopyVariable(op.getVariable()),
                 exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()),
-                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType(), op.getPositionWriter());
+                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
@@ -602,7 +592,7 @@
             throws AlgebricksException {
         LeftOuterUnnestOperator opCopy = new LeftOuterUnnestOperator(deepCopyVariable(op.getVariable()),
                 exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()),
-                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType(), op.getPositionWriter());
+                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 62ea79d..e25c8c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -227,7 +227,7 @@
     @Override
     public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
         return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
-                op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
+                op.getPositionalVariable(), op.getPositionalVariableType());
     }
 
     @Override
@@ -412,7 +412,7 @@
     public ILogicalOperator visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg)
             throws AlgebricksException {
         return new LeftOuterUnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
-                op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
+                op.getPositionalVariable(), op.getPositionalVariableType());
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
index 5d54e1d..90e88dc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -81,8 +82,10 @@
         IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+        IUnnestingPositionWriterFactory positionWriterFactory =
+                unnest.hasPositionalVariable() ? context.getUnnestingPositionWriterFactory() : null;
         UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
-                unnest.getPositionWriter(), leftOuter, context.getMissingWriterFactory());
+                positionWriterFactory, leftOuter, context.getMissingWriterFactory());
         unnestRuntime.setSourceLocation(unnest.getSourceLocation());
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b8fe5c7..9ca2f6e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -21,6 +21,7 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
@@ -463,7 +464,7 @@
         return -1;
     }
 
-    public static List<Mutable<ILogicalExpression>> createVariableReferences(List<LogicalVariable> varList,
+    public static List<Mutable<ILogicalExpression>> createVariableReferences(Collection<LogicalVariable> varList,
             SourceLocation sourceLoc) {
         List<Mutable<ILogicalExpression>> varRefs = new ArrayList<>(varList.size());
         for (LogicalVariable var : varList) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 4bc5689..41cde2e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -57,7 +58,8 @@
     private final IPrinterFactoryProvider printerFactoryProvider;
     private final ITypeTraitProvider typeTraitProvider;
     private final IMetadataProvider<?, ?> metadataProvider;
-    private final IMissingWriterFactory nonMatchWriterFactory;
+    private final IMissingWriterFactory missingWriterFactory;
+    private final IUnnestingPositionWriterFactory unnestingPositionWriterFactory;
     private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
     private final Object appContext;
     private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
@@ -81,7 +83,7 @@
             IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
             IBinaryBooleanInspectorFactory booleanInspectorFactory,
             IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
-            IMissingWriterFactory nullWriterFactory,
+            IMissingWriterFactory missingWriterFactory, IUnnestingPositionWriterFactory unnestingPositionWriterFactory,
             INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
             IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
             ITypingContext typingContext, IExpressionEvalSizeComputer expressionEvalSizeComputer,
@@ -102,7 +104,8 @@
         this.printerFactoryProvider = printerFactoryProvider;
         this.clusterLocations = clusterLocations;
         this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
-        this.nonMatchWriterFactory = nullWriterFactory;
+        this.missingWriterFactory = missingWriterFactory;
+        this.unnestingPositionWriterFactory = unnestingPositionWriterFactory;
         this.expressionRuntimeProvider = expressionRuntimeProvider;
         this.expressionTypeComputer = expressionTypeComputer;
         this.typingContext = typingContext;
@@ -190,7 +193,11 @@
     }
 
     public IMissingWriterFactory getMissingWriterFactory() {
-        return nonMatchWriterFactory;
+        return missingWriterFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getUnnestingPositionWriterFactory() {
+        return unnestingPositionWriterFactory;
     }
 
     public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
index 9bb620b..dc97f3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
@@ -19,38 +19,64 @@
 package org.apache.hyracks.algebricks.core.jobgen.impl;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
 public class OperatorSchemaImpl implements IOperatorSchema {
-    private final Map<LogicalVariable, Integer> varMap;
+
+    private final Object2IntMap<LogicalVariable> varMap;
 
     private final List<LogicalVariable> varList;
 
     public OperatorSchemaImpl() {
-        varMap = new HashMap<LogicalVariable, Integer>();
-        varList = new ArrayList<LogicalVariable>();
+        varMap = new Object2IntOpenHashMap<>();
+        varList = new ArrayList<>();
     }
 
     @Override
     public void addAllVariables(IOperatorSchema source) {
         for (LogicalVariable v : source) {
-            varMap.put(v, varList.size());
-            varList.add(v);
+            addVariable(v);
         }
     }
 
     @Override
     public void addAllNewVariables(IOperatorSchema source) {
-        for (LogicalVariable v : source) {
-            if (varMap.get(v) == null) {
-                varMap.put(v, varList.size());
-                varList.add(v);
+        // source schema can contain the same variable more than once,
+        // we need to add it as many times if this variable is new.
+        int sourceLen = source.getSize();
+        BitSet newVarIdxs = null;
+        for (int i = 0; i < sourceLen; i++) {
+            LogicalVariable v = source.getVariable(i);
+            if (!varMap.containsKey(v)) {
+                addVariable(v);
+                if (newVarIdxs == null) {
+                    newVarIdxs = new BitSet(sourceLen);
+                }
+                newVarIdxs.set(i);
+            } else {
+                // if varMap contains this variable then we need to differentiate between two cases:
+                // 1. the variable was present before this method was called, therefore it's not new
+                // 2. the variable was not present before this method was called, but was added earlier in this loop
+                //    (i.e. it is a duplicate entry of the same variable in the source schema)
+                //    in this case it is considered new and we need to add it to varMap
+                if (newVarIdxs != null) {
+                    for (int j = newVarIdxs.nextSetBit(0); j >= 0; j = newVarIdxs.nextSetBit(j + 1)) {
+                        if (v.equals(source.getVariable(j))) {
+                            // this variable was previously added as 'new'
+                            // we need to add a duplicate entry for this variable
+                            addVariable(v);
+                            break;
+                        }
+                    }
+                }
             }
         }
     }
@@ -95,6 +121,6 @@
 
     @Override
     public String toString() {
-        return varMap.toString();
+        return varMap + " " + varList;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriter.java
similarity index 87%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
rename to hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriter.java
index 092d22e..935665e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriter.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
+package org.apache.hyracks.algebricks.data;
 
 import java.io.DataOutput;
 import java.io.IOException;
 
 public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
+    void write(DataOutput dataOutput, long position) throws IOException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriterFactory.java
similarity index 77%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
copy to hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriterFactory.java
index 092d22e..164ea99 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IUnnestingPositionWriterFactory.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.base;
 
-import java.io.DataOutput;
-import java.io.IOException;
+package org.apache.hyracks.algebricks.data;
 
-public interface IUnnestingPositionWriter {
-    public void write(DataOutput dataOutput, long position) throws IOException;
-}
+import java.io.Serializable;
+
+@FunctionalInterface
+public interface IUnnestingPositionWriterFactory extends Serializable {
+    IUnnestingPositionWriter createUnnestingPositionWriter();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index f9e6b13..bd24644 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -29,6 +29,7 @@
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -217,19 +218,23 @@
                 if (ref.equals(candidate)) {
                     continue;
                 }
-                ArrayList<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
-                Map<LogicalVariable, LogicalVariable> variableMappingBack =
-                        new HashMap<LogicalVariable, LogicalVariable>();
+                ListSet<LogicalVariable> liveVarsNewSet = new ListSet<>(liveVarsNew);
+                List<LogicalVariable> liveVars = new ArrayList<>(liveVarsNew.size());
+                ListSet<LogicalVariable> liveVarsSet = new ListSet<>();
+                Map<LogicalVariable, LogicalVariable> variableMappingBack = new HashMap<>();
                 IsomorphismUtilities.mapVariablesTopDown(ref.getValue(), candidate.getValue(), variableMappingBack);
-                for (int i = 0; i < liveVarsNew.size(); i++) {
-                    liveVars.add(variableMappingBack.get(liveVarsNew.get(i)));
+                for (LogicalVariable liveVarNew : liveVarsNew) {
+                    liveVars.add(variableMappingBack.get(liveVarNew));
+                }
+                for (LogicalVariable liveVarNew : liveVarsNewSet) {
+                    liveVarsSet.add(variableMappingBack.get(liveVarNew));
                 }
 
                 SourceLocation refSourceLoc = ref.getValue().getSourceLocation();
 
                 List<Mutable<ILogicalExpression>> assignExprs =
-                        OperatorManipulationUtil.createVariableReferences(liveVarsNew, candidateSourceLoc);
-                AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
+                        OperatorManipulationUtil.createVariableReferences(liveVarsNewSet, candidateSourceLoc);
+                AbstractLogicalOperator assignOperator = new AssignOperator(new ArrayList<>(liveVarsSet), assignExprs);
                 assignOperator.setSourceLocation(refSourceLoc);
                 assignOperator.setExecutionMode(rop.getExecutionMode());
                 assignOperator.setPhysicalOperator(new AssignPOperator());
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
deleted file mode 100644
index 145761c..0000000
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of
- * the UnionAllOperator's branches with appropriate variable replacements.
- * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator,
- * for example, eliminating common sub-expressions.
- * Example:
- * Before plan:
- * ...
- * assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
- * union ($$1, $$2, $$3) ($$4, $$5, $$6)
- * union_branch_0
- * ...
- * union_branch_1
- * ...
- * After plan:
- * ...
- * union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
- * assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
- * union_branch_0
- * ...
- * assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
- * union_branch_1
- * ...
- */
-public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (!op.hasInputs()) {
-            return false;
-        }
-
-        boolean modified = false;
-        inputs_loop: for (int i = 0; i < op.getInputs().size(); i++) {
-            AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
-            if (childOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-                continue;
-            }
-            AssignOperator assignOp = (AssignOperator) childOp;
-            for (Mutable<ILogicalExpression> expr : assignOp.getExpressions()) {
-                if (!expr.getValue().isFunctional()) {
-                    continue inputs_loop;
-                }
-            }
-
-            AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
-            if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
-                continue;
-            }
-            UnionAllOperator unionOp = (UnionAllOperator) childOfChildOp;
-            Set<LogicalVariable> assignUsedVars = new HashSet<>();
-            VariableUtilities.getUsedVariables(assignOp, assignUsedVars);
-            List<LogicalVariable> assignVars = assignOp.getVariables();
-            AssignOperator[] newAssignOps = new AssignOperator[2];
-            for (int j = 0; j < unionOp.getInputs().size(); j++) {
-                newAssignOps[j] = createAssignBelowUnionAllBranch(unionOp, j, assignOp, assignUsedVars, context);
-                if (newAssignOps[j] == null) {
-                    continue inputs_loop;
-                }
-            }
-            // Add original assign variables to the union variable mappings.
-            for (int j = 0; j < assignVars.size(); j++) {
-                LogicalVariable first = newAssignOps[0].getVariables().get(j);
-                LogicalVariable second = newAssignOps[1].getVariables().get(j);
-                unionOp.getVariableMappings().add(new Triple<>(first, second, assignVars.get(j)));
-            }
-            context.computeAndSetTypeEnvironmentForOperator(unionOp);
-            // Remove original assign operator.
-            op.getInputs().set(i, assignOp.getInputs().get(0));
-            context.computeAndSetTypeEnvironmentForOperator(op);
-            modified = true;
-        }
-
-        return modified;
-    }
-
-    private AssignOperator createAssignBelowUnionAllBranch(UnionAllOperator unionOp, int inputIndex,
-            AssignOperator originalAssignOp, Set<LogicalVariable> assignUsedVars, IOptimizationContext context)
-            throws AlgebricksException {
-        AssignOperator newAssignOp = cloneAssignOperator(originalAssignOp, context, unionOp, inputIndex);
-        if (newAssignOp == null) {
-            return null;
-        }
-        newAssignOp.getInputs().add(new MutableObject<>(unionOp.getInputs().get(inputIndex).getValue()));
-        unionOp.getInputs().get(inputIndex).setValue(newAssignOp);
-        int numVarMappings = unionOp.getVariableMappings().size();
-        for (int i = 0; i < numVarMappings; i++) {
-            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = unionOp.getVariableMappings().get(i);
-            if (assignUsedVars.contains(varMapping.third)) {
-                LogicalVariable replacementVar;
-                if (inputIndex == 0) {
-                    replacementVar = varMapping.first;
-                } else {
-                    replacementVar = varMapping.second;
-                }
-                VariableUtilities.substituteVariables(newAssignOp, varMapping.third, replacementVar, context);
-            }
-        }
-        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
-        return newAssignOp;
-    }
-
-    /**
-     * Clones the given assign operator changing the returned variables to be new ones.
-     * Also, leaves the inputs of the clone clear. It returns null if the assign operator cannot be pushed.
-     */
-    private AssignOperator cloneAssignOperator(AssignOperator assignOp, IOptimizationContext context,
-            UnionAllOperator unionOp, int inputIndex) throws AlgebricksException {
-        List<LogicalVariable> vars = new ArrayList<>();
-        List<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
-        int numVars = assignOp.getVariables().size();
-        for (int i = 0; i < numVars; i++) {
-            vars.add(context.newVar());
-            ILogicalExpression clonedExpression = assignOp.getExpressions().get(i).getValue().cloneExpression();
-            if (!modifyExpression(clonedExpression, unionOp, context, inputIndex)) {
-                return null; // bail if the expression couldn't be modified according to the branch it is moved to
-            }
-            exprs.add(new MutableObject<>(clonedExpression));
-        }
-        AssignOperator assignCloneOp = new AssignOperator(vars, exprs);
-        assignCloneOp.setSourceLocation(assignOp.getSourceLocation());
-        assignCloneOp.setExecutionMode(assignOp.getExecutionMode());
-        return assignCloneOp;
-    }
-
-    // modifies the cloned expression according to the branch it'll be moved to. returns true if successful.
-    protected boolean modifyExpression(ILogicalExpression expression, UnionAllOperator unionOp,
-            IOptimizationContext ctx, int inputIndex) throws AlgebricksException {
-        // default implementation does not check specific expressions
-        return true;
-    }
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
new file mode 100644
index 0000000..3c468be
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+/**
+ * @author kereno, ecarm002, ildar.absalyamov
+ *         Pushes down 'map' operator through both branches of the union-all operator
+ *         Before rule:
+ *         ============
+ *         map_op
+ *         union-all (left_branch, right_branch, res)
+ *         left_branch
+ *         left_op_1
+ *         ...
+ *         right_branch
+ *         right_op_1
+ *
+ *         After rule:
+ *         ============
+ *         union-all (left_branch, right_branch, res)
+ *         left_branch
+ *         map_op
+ *         left_op_1
+ *         right_branch
+ *         map_op
+ *         right_op_1
+ */
+public abstract class PushMapOperatorThroughUnionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    protected boolean isOperatorKindPushableThroughUnion(ILogicalOperator op) {
+        return op.isMap();
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+        boolean pushable = isOperatorKindPushableThroughUnion(op) && OperatorPropertiesUtil.isMovable(op);
+        if (!pushable) {
+            return false;
+        }
+
+        Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0);
+        AbstractLogicalOperator inputOp = (AbstractLogicalOperator) inputOpRef.getValue();
+        if (inputOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+            return false;
+        }
+        UnionAllOperator unionAllOp = (UnionAllOperator) inputOp;
+
+        List<LogicalVariable> opProducedVars = new ArrayList<>();
+        VariableUtilities.getProducedVariables(op, opProducedVars);
+
+        Set<LogicalVariable> opUsedVars = new HashSet<>();
+        VariableUtilities.getUsedVariables(op, opUsedVars);
+
+        Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> leftBranchPair =
+                insertIntoBranch(op, opUsedVars, unionAllOp, 0, context);
+        if (leftBranchPair == null) {
+            return false;
+        }
+        ILogicalOperator leftBranchRootOp = leftBranchPair.first;
+        //  < original produced var, new produced variable in left branch >
+        Map<LogicalVariable, LogicalVariable> leftBranchProducedVarMap = leftBranchPair.second;
+
+        Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> rightBranchPair =
+                insertIntoBranch(op, opUsedVars, unionAllOp, 1, context);
+        if (rightBranchPair == null) {
+            return false;
+        }
+        ILogicalOperator rightBranchRootOp = rightBranchPair.first;
+        // < original produced var, new produced variable in right branch >
+        Map<LogicalVariable, LogicalVariable> rightBranchProducedVarMap = rightBranchPair.second;
+
+        UnionAllOperator newUnionAllOp = (UnionAllOperator) OperatorManipulationUtil.deepCopy(unionAllOp);
+        newUnionAllOp.getInputs().add(new MutableObject<>(leftBranchRootOp));
+        newUnionAllOp.getInputs().add(new MutableObject<>(rightBranchRootOp));
+
+        for (LogicalVariable opProducedVar : opProducedVars) {
+            LogicalVariable leftBranchProducedVar = leftBranchProducedVarMap.get(opProducedVar);
+            if (leftBranchProducedVar == null) {
+                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                        "Cannot find " + opProducedVar);
+            }
+            LogicalVariable rightBranchProducedVar = rightBranchProducedVarMap.get(opProducedVar);
+            if (rightBranchProducedVar == null) {
+                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                        "Cannot find " + opProducedVar);
+            }
+
+            newUnionAllOp.getVariableMappings()
+                    .add(new Triple<>(leftBranchProducedVar, rightBranchProducedVar, opProducedVar));
+        }
+
+        context.computeAndSetTypeEnvironmentForOperator(newUnionAllOp);
+        opRef.setValue(newUnionAllOp);
+
+        return true;
+    }
+
+    private Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> insertIntoBranch(AbstractLogicalOperator op,
+            Set<LogicalVariable> opUsedVars, UnionAllOperator unionAllOp, int branchIdx, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator branchRootOp = unionAllOp.getInputs().get(branchIdx).getValue();
+        Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> newBranchRootOpPair =
+                deepCopyForBranch(op, opUsedVars, unionAllOp, branchIdx, context);
+        if (newBranchRootOpPair == null) {
+            return null;
+        }
+
+        ILogicalOperator newBranchRootOp = newBranchRootOpPair.first;
+        newBranchRootOp.getInputs().get(0).setValue(branchRootOp);
+        context.computeAndSetTypeEnvironmentForOperator(newBranchRootOp);
+
+        // [ operator, < original produced var, new variable (produced in branch) > ]
+        return newBranchRootOpPair;
+    }
+
+    protected Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyForBranch(ILogicalOperator op,
+            Set<LogicalVariable> opUsedVars, UnionAllOperator unionAllOp, int branchIdx, IOptimizationContext context)
+            throws AlgebricksException {
+        // < union-output-var, union-branch-var >
+        LinkedHashMap<LogicalVariable, LogicalVariable> usedVarsMapping = new LinkedHashMap<>();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : unionAllOp.getVariableMappings()) {
+            LogicalVariable unionOutputVar = t.third;
+            if (opUsedVars.contains(unionOutputVar)) {
+                LogicalVariable branchVar = branchIdx == 0 ? t.first : t.second;
+                usedVarsMapping.put(unionOutputVar, branchVar);
+            }
+        }
+
+        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
+                new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null, usedVarsMapping, true);
+        ILogicalOperator newOp = deepCopyVisitor.deepCopy(op);
+
+        return new Pair<>(newOp, deepCopyVisitor.getInputToOutputVariableMapping());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughUnionRule.java
deleted file mode 100644
index 434faff..0000000
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughUnionRule.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * @author kereno, ecarm002, ildar.absalyamov
- *         Pushes down unnest through both branches of the union operator
- *         Before rule:
- *         ============
- *         unnest
- *         union (left_branch, right_branch, res)
- *         left_branch
- *         right_branch
- *         After rule:
- *         ============
- *         union (left_branch, right_branch, res)
- *         unnest
- *         left_branch
- *         unnest
- *         right_branch
- */
-public class PushUnnestDownThroughUnionRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-
-        AbstractLogicalOperator unnest = (AbstractLogicalOperator) opRef.getValue();
-        if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) {
-            return false;
-        }
-        UnnestOperator unnestOpRef = (UnnestOperator) opRef.getValue();
-        Mutable<ILogicalOperator> unionOp = unnest.getInputs().get(0);
-
-        AbstractLogicalOperator unionAbstractOp = (AbstractLogicalOperator) unionOp.getValue();
-        if (unionAbstractOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
-            return false;
-        }
-
-        LogicalVariable unnestVar1 = context.newVar();
-        UnnestOperator unnest1 = new UnnestOperator(unnestVar1,
-                new MutableObject<ILogicalExpression>(unnestOpRef.getExpressionRef().getValue().cloneExpression()));
-        LogicalVariable unnestVar2 = context.newVar();
-        UnnestOperator unnest2 = new UnnestOperator(unnestVar2,
-                new MutableObject<ILogicalExpression>(unnestOpRef.getExpressionRef().getValue().cloneExpression()));
-
-        //Getting the two topmost branched and adding them as an input to the unnests:
-        Mutable<ILogicalOperator> branch1 = unionAbstractOp.getInputs().get(0);
-        ILogicalOperator agg1 = branch1.getValue();
-        List<LogicalVariable> agg1_var = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(agg1, agg1_var);
-        Mutable<ILogicalOperator> branch2 = unionAbstractOp.getInputs().get(1);
-        ILogicalOperator agg2 = branch2.getValue();
-        List<LogicalVariable> agg2_var = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(agg2, agg2_var);
-
-        //Modifying the unnest so it has the right variable
-        List<LogicalVariable> var_unnest_1 = new ArrayList<LogicalVariable>();
-        unnest1.getExpressionRef().getValue().getUsedVariables(var_unnest_1);
-        unnest1.getExpressionRef().getValue().substituteVar(var_unnest_1.get(0), agg1_var.get(0));
-
-        List<LogicalVariable> var_unnest2 = new ArrayList<LogicalVariable>();
-        unnest2.getExpressionRef().getValue().getUsedVariables(var_unnest2);
-        unnest2.getExpressionRef().getValue().substituteVar(var_unnest2.get(0), agg2_var.get(0));
-
-        unnest1.getInputs().add(branch1);
-        unnest2.getInputs().add(branch2);
-        context.computeAndSetTypeEnvironmentForOperator(unnest1);
-        context.computeAndSetTypeEnvironmentForOperator(unnest2);
-
-        //creating a new union operator with the updated logical variables
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap =
-                new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>(1);
-        Triple<LogicalVariable, LogicalVariable, LogicalVariable> union_triple_vars =
-                new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(unnestVar1, unnestVar2,
-                        unnestOpRef.getVariables().get(0));
-        varMap.add(union_triple_vars);
-        UnionAllOperator unionOpFinal = new UnionAllOperator(varMap);
-
-        unionOpFinal.getInputs().add(new MutableObject<ILogicalOperator>(unnest1));
-        unionOpFinal.getInputs().add(new MutableObject<ILogicalOperator>(unnest2));
-
-        context.computeAndSetTypeEnvironmentForOperator(unionOpFinal);
-
-        opRef.setValue(unionOpFinal);
-        return true;
-
-    }
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesInUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesInUnionRule.java
new file mode 100644
index 0000000..fc9bb05
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesInUnionRule.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes redundant variable mapping from the union-all operator as follows:
+ * <p>
+ * Before rule:
+ * <pre>
+ * union-all([$left, $right, $out_1], [$left, $right, $out_2])
+ * </pre>
+ * <p>
+ * After rule:
+ * <pre>
+ * assign $out_2 := $out_1
+ * union-all([$left, $right, $out_1])
+ * </pre>
+ * Notes:
+ * <ul>
+ * <li>
+ * The new assign operator is annotated as {@link OperatorPropertiesUtil#MOVABLE non-movable}
+ * to prevent it from being pushed back into the union-all by {@link PushMapOperatorThroughUnionRule}.
+ * It is supposed to be removed later by {@link RemoveUnusedAssignAndAggregateRule}.
+ * <li>
+ * This rule is supposed to run during logical optimization stage because
+ * it does not compute schema for the new assign operator
+ * </ul>
+ */
+public final class RemoveRedundantVariablesInUnionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+            return false;
+        }
+
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+
+        UnionAllOperator unionAllOp = (UnionAllOperator) op;
+        List<LogicalVariable> newAssignVars = null;
+        List<Mutable<ILogicalExpression>> newAssignExprs = null;
+
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMappings = unionAllOp.getVariableMappings();
+
+        for (int i = 0; i < varMappings.size();) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = varMappings.get(i);
+            LogicalVariable existingOutputVar = findVarMapping(varMappings, i, varMapping.first, varMapping.second);
+            if (existingOutputVar != null) {
+                LogicalVariable thisOutputVar = varMapping.third;
+                if (newAssignVars == null) {
+                    newAssignVars = new ArrayList<>();
+                    newAssignExprs = new ArrayList<>();
+                }
+                VariableReferenceExpression existingOutputVarRef = new VariableReferenceExpression(existingOutputVar);
+                existingOutputVarRef.setSourceLocation(unionAllOp.getSourceLocation());
+                newAssignVars.add(thisOutputVar);
+                newAssignExprs.add(new MutableObject<>(existingOutputVarRef));
+
+                varMappings.remove(i);
+            } else {
+                i++;
+            }
+        }
+
+        if (newAssignVars == null) {
+            return false;
+        }
+
+        AssignOperator newAssignOp = new AssignOperator(newAssignVars, newAssignExprs);
+        newAssignOp.setSourceLocation(unionAllOp.getSourceLocation());
+        newAssignOp.getInputs().add(new MutableObject<>(unionAllOp));
+
+        // this Assign is supposed to be removed later by RemoveUnusedAssignAndAggregateRule.
+        // mark it as non-movable to prevent PushMapThroughUnionRule from pushing it back into the UnionAll operator
+        OperatorPropertiesUtil.markMovable(newAssignOp, false);
+
+        context.computeAndSetTypeEnvironmentForOperator(unionAllOp);
+        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
+        opRef.setValue(newAssignOp);
+
+        context.addToDontApplySet(this, unionAllOp);
+
+        return true;
+    }
+
+    private static LogicalVariable findVarMapping(
+            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMappings, int endIndexExclusive,
+            LogicalVariable firstBranchVar, LogicalVariable secondBranchVar) {
+        int n = Math.min(endIndexExclusive, varMappings.size());
+        for (int i = 0; i < n; i++) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> t = varMappings.get(i);
+            if (t.first.equals(firstBranchVar) && t.second.equals(secondBranchVar)) {
+                return t.third;
+            }
+        }
+        return null;
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 74e182b..9ee5a48 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -24,10 +24,11 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriter;
+import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
 import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
@@ -46,7 +47,7 @@
     private final int outCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final boolean unnestColIsProjected;
-    private final IUnnestingPositionWriter positionWriter;
+    private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
     private final IMissingWriterFactory missingWriterFactory;
     private int outColPos;
@@ -57,7 +58,8 @@
     }
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
-            IUnnestingPositionWriter positionWriter, boolean leftOuter, IMissingWriterFactory missingWriterFactory) {
+            IUnnestingPositionWriterFactory positionWriterFactory, boolean leftOuter,
+            IMissingWriterFactory missingWriterFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -68,7 +70,7 @@
             }
         }
         unnestColIsProjected = outColPos >= 0;
-        this.positionWriter = positionWriter;
+        this.positionWriterFactory = positionWriterFactory;
         this.leftOuter = leftOuter;
         this.missingWriterFactory = missingWriterFactory;
     }
@@ -94,6 +96,8 @@
             private IPointable p = VoidPointable.FACTORY.createPointable();
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+            private final IUnnestingPositionWriter positionWriter =
+                    positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
 
             @Override
             public void open() throws HyracksDataException {