Support UNION ALL.

- support heterogenous input types for UNION ALL and the output is resolved to a mimimally generalized type;
- fix the partitioning property inference for UnionAllPOperator;
- fix for ASTERIXDB-1354, a bug in IsomorphismVariableMappingVisitor.

Change-Id: Ifadf1707bb2b6bed22f8fc5792c635e87291a468
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1035
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 0fba9da..eb4751d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -36,7 +36,8 @@
 import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule;
 import org.apache.asterix.optimizer.rules.FeedScanCollectionToUnnest;
 import org.apache.asterix.optimizer.rules.FuzzyEqRule;
-import org.apache.asterix.optimizer.rules.InjectToAnyTypeCastRule;
+import org.apache.asterix.optimizer.rules.InjectTypeCastForSwitchCaseRule;
+import org.apache.asterix.optimizer.rules.InjectTypeCastForUnionRule;
 import org.apache.asterix.optimizer.rules.InlineUnnestFunctionRule;
 import org.apache.asterix.optimizer.rules.IntroduceAutogenerateIDRule;
 import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule;
@@ -281,7 +282,13 @@
         planCleanupRules.add(new IntroduceDynamicTypeCastForExternalFunctionRule());
         planCleanupRules.add(new RemoveUnusedAssignAndAggregateRule());
         planCleanupRules.add(new RemoveCartesianProductWithEmptyBranchRule());
-        planCleanupRules.add(new InjectToAnyTypeCastRule());
+        planCleanupRules.add(new InjectTypeCastForSwitchCaseRule());
+        planCleanupRules.add(new InjectTypeCastForUnionRule());
+
+        // Needs to invoke ByNameToByIndexFieldAccessRule as the last logical optimization rule because
+        // some rules can push a FieldAccessByName to a place where the name it tries to access is in the closed part.
+        // For example, a possible scenario is that a field-access-by-name can be pushed down through UnionAllOperator.
+        planCleanupRules.add(new ByNameToByIndexFieldAccessRule());
         return planCleanupRules;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectToAnyTypeCastRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
similarity index 69%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectToAnyTypeCastRule.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
index 075c1f0..2b70dcf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectToAnyTypeCastRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
@@ -23,12 +23,10 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.asterix.dataflow.data.common.TypeResolverUtil;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -46,7 +44,7 @@
  * This rule injects cast functions for "THEN" and "ELSE" branches of a switch-case function if
  * different "THEN" and "ELSE" branches have heterogeneous return types.
  */
-public class InjectToAnyTypeCastRule implements IAlgebraicRewriteRule {
+public class InjectTypeCastForSwitchCaseRule implements IAlgebraicRewriteRule {
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -57,7 +55,7 @@
         }
         // Populates the latest type information.
         context.computeAndSetTypeEnvironmentForOperator(op);
-        if (op.acceptExpressionTransform(exprRef -> injectToAnyTypeCast(op, exprRef, context))) {
+        if (op.acceptExpressionTransform(exprRef -> injectTypeCast(op, exprRef, context))) {
             // Generates the up-to-date type information.
             context.computeAndSetTypeEnvironmentForOperator(op);
             return true;
@@ -65,8 +63,9 @@
         return false;
     }
 
-    // Injects type casts to cast return expressions' return types to ANY.
-    private boolean injectToAnyTypeCast(ILogicalOperator op, Mutable<ILogicalExpression> exprRef,
+    // Injects type casts to cast return expressions' return types to a generalized type that conforms to every
+    // return type.
+    private boolean injectTypeCast(ILogicalOperator op, Mutable<ILogicalExpression> exprRef,
             IOptimizationContext context) throws AlgebricksException {
         ILogicalExpression expr = exprRef.getValue();
         if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -76,7 +75,7 @@
         AbstractFunctionCallExpression func = (AbstractFunctionCallExpression) expr;
         for (Mutable<ILogicalExpression> argRef : func.getArguments()) {
             // Recursively rewrites arguments.
-            if (injectToAnyTypeCast(op, argRef, context)) {
+            if (injectTypeCast(op, argRef, context)) {
                 context.computeAndSetTypeEnvironmentForOperator(op);
                 rewritten = true;
             }
@@ -87,29 +86,24 @@
         return rewriteSwitchCase(op, func, context);
     }
 
-    // Injects casts that cast types to ANY for different "THEN" and "ELSE" branches.
+    // Injects casts that cast types for different "THEN" and "ELSE" branches.
     private boolean rewriteSwitchCase(ILogicalOperator op, AbstractFunctionCallExpression func,
             IOptimizationContext context) throws AlgebricksException {
         IVariableTypeEnvironment env = context.getOutputTypeEnvironment(op.getInputs().get(0).getValue());
-        if (!this.isHeterogenous(func, env)) {
-            return false;
-        }
+        IAType producedType = (IAType) env.getType(func);
         List<Mutable<ILogicalExpression>> argRefs = func.getArguments();
         int argSize = argRefs.size();
         boolean rewritten = false;
         for (int argIndex = 2; argIndex < argSize; argIndex += (argIndex + 2 == argSize) ? 1 : 2) {
             Mutable<ILogicalExpression> argRef = argRefs.get(argIndex);
             IAType type = (IAType) env.getType(argRefs.get(argIndex).getValue());
-            ATypeTag tag = type.getTypeTag();
-            // Casts are only needed when the original return type is a complex type.
-            // (In the runtime, there is already a type tag for scalar types.)
-            if (tag == ATypeTag.RECORD || tag == ATypeTag.UNORDEREDLIST || tag == ATypeTag.ORDEREDLIST) {
+            if (TypeResolverUtil.needsCast(producedType, type)) {
                 ILogicalExpression argExpr = argRef.getValue();
-                // Injects a cast call to cast the data type to ANY.
+                // Injects a cast call to cast the data type to the produced type of the switch-case function call.
                 ScalarFunctionCallExpression castFunc = new ScalarFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE),
                         new ArrayList<>(Collections.singletonList(new MutableObject<>(argExpr))));
-                TypeCastUtils.setRequiredAndInputTypes(castFunc, BuiltinType.ANY, type);
+                TypeCastUtils.setRequiredAndInputTypes(castFunc, producedType, type);
                 argRef.setValue(castFunc);
                 rewritten = true;
             }
@@ -117,29 +111,4 @@
         return rewritten;
     }
 
-    // Checks whether "THEN" and "ELSE" branches return the heterogeneous types.
-    private boolean isHeterogenous(AbstractFunctionCallExpression func, IVariableTypeEnvironment env)
-            throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> argRefs = func.getArguments();
-        int argSize = argRefs.size();
-        IAType currentType = null;
-        boolean heterogenous = false;
-        for (int argIndex = 2; argIndex < argSize; argIndex += (argIndex + 2 == argSize) ? 1 : 2) {
-            IAType type = (IAType) env.getType(argRefs.get(argIndex).getValue());
-            ATypeTag typeTag = type.getTypeTag();
-            // Null and missing are not considered as heterogeneous with other types.
-            if (typeTag != ATypeTag.NULL && typeTag != ATypeTag.MISSING) {
-                if (typeTag == ATypeTag.UNION) {
-                    type = ((AUnionType) type).getActualType();
-                }
-                if (currentType != null && !type.equals(currentType)) {
-                    heterogenous = true;
-                    break;
-                }
-                currentType = type;
-            }
-        }
-        return heterogenous;
-    }
-
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForUnionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForUnionRule.java
new file mode 100644
index 0000000..67ffb1a
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForUnionRule.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.common.TypeResolverUtil;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
+import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule injects type casts for inputs of a UnionAllOperator if those
+ * inputs have heterogeneous types.
+ */
+public class InjectTypeCastForUnionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+            return false;
+        }
+        UnionAllOperator unionAllOperator = (UnionAllOperator) op;
+        // Injects casts to the first and second input branch of the UnionAllOperator.
+        return injectCast(unionAllOperator, 0, context) || injectCast(unionAllOperator, 1, context);
+    }
+
+    // Injects a type cast function on one input (indicated by childIndex) of the union all operator if necessary.
+    private boolean injectCast(UnionAllOperator op, int childIndex, IOptimizationContext context)
+            throws AlgebricksException {
+        // Gets the type environments for the union all operator and its child operator with the right child index.
+        IVariableTypeEnvironment env = context.getOutputTypeEnvironment(op);
+        Mutable<ILogicalOperator> branchOpRef = op.getInputs().get(childIndex);
+        IVariableTypeEnvironment childEnv = context.getOutputTypeEnvironment(branchOpRef.getValue());
+
+        // The two lists are used for the assign operator that calls cast functions.
+        List<LogicalVariable> varsToCast = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> castFunctionsForLeft = new ArrayList<>();
+
+        // Iterate through all triples.
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> triples = op.getVariableMappings();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : triples) {
+            LogicalVariable producedVar = triple.third;
+            IAType producedType = (IAType) env.getVarType(producedVar);
+            LogicalVariable varToCast = childIndex == 0 ? triple.first : triple.second;
+            IAType inputType = (IAType) childEnv.getVarType(varToCast);
+            if (!TypeResolverUtil.needsCast(producedType, inputType)) {
+                // Continues to the next triple if no cast is neeeded.
+                continue;
+            }
+            LogicalVariable castedVar = context.newVar();
+            // Resets triple variables to new variables that bind to the results of type casting.
+            triple.first = childIndex == 0 ? castedVar : triple.first;
+            triple.second = childIndex > 0 ? castedVar : triple.second;
+            ScalarFunctionCallExpression castFunc = new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE), new ArrayList<>(Collections
+                            .singletonList(new MutableObject<>(new VariableReferenceExpression(varToCast)))));
+            TypeCastUtils.setRequiredAndInputTypes(castFunc, producedType, inputType);
+
+            // Adds the variable and function expression into lists, for the assign operator.
+            varsToCast.add(castedVar);
+            castFunctionsForLeft.add(new MutableObject<>(castFunc));
+        }
+        if (castFunctionsForLeft.isEmpty()) {
+            return false;
+        }
+        // Injects an assign operator to perform type casts.
+        AssignOperator assignOp = new AssignOperator(varsToCast, castFunctionsForLeft);
+        assignOp.getInputs().add(new MutableObject<>(branchOpRef.getValue()));
+        branchOpRef.setValue(assignOp);
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+
+        // Returns true to indicate that rewriting happens.
+        return true;
+    }
+
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
index b5c9018..6bcdd90 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.translator;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
@@ -32,27 +31,22 @@
 import org.apache.asterix.lang.common.base.Clause;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Expression.Kind;
+import org.apache.asterix.lang.common.base.ILangExpression;
 import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 
 /**
@@ -89,7 +83,7 @@
                     pVar, BuiltinType.AINT64, new AqlPositionWriter());
         }
         returnedOp.getInputs().add(eo.second);
-        return new Pair<ILogicalOperator, LogicalVariable>(returnedOp, v);
+        return new Pair<>(returnedOp, v);
     }
 
     @Override
@@ -105,7 +99,7 @@
         }
         for (Clause c : flwor.getClauseList()) {
             Pair<ILogicalOperator, LogicalVariable> pC = c.accept(this, flworPlan);
-            flworPlan = new MutableObject<ILogicalOperator>(pC.first);
+            flworPlan = new MutableObject<>(pC.first);
         }
 
         Expression r = flwor.getReturnExpr();
@@ -117,7 +111,7 @@
             LogicalVariable var = context.getVar(v.getVar().getId());
             result = produceFlworPlan(noFlworClause, isTop, flworPlan, var);
         } else {
-            Mutable<ILogicalOperator> baseOp = new MutableObject<ILogicalOperator>(flworPlan.getValue());
+            Mutable<ILogicalOperator> baseOp = new MutableObject<>(flworPlan.getValue());
             Pair<ILogicalOperator, LogicalVariable> rRes = r.accept(this, baseOp);
             ILogicalOperator rOp = rRes.first;
             ILogicalOperator resOp;
@@ -130,7 +124,7 @@
                 resOp = s;
                 baseOp.setValue(new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(s)));
             }
-            Mutable<ILogicalOperator> resOpRef = new MutableObject<ILogicalOperator>(resOp);
+            Mutable<ILogicalOperator> resOpRef = new MutableObject<>(resOp);
             result = produceFlworPlan(noFlworClause, isTop, resOpRef, rRes.second);
         }
         if (!isTop) {
@@ -149,7 +143,7 @@
     @Override
     public Pair<ILogicalOperator, LogicalVariable> visit(DistinctClause dc, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
-        List<Mutable<ILogicalExpression>> exprList = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> exprList = new ArrayList<>();
         Mutable<ILogicalOperator> input = null;
         for (Expression expr : dc.getDistinctByExpr()) {
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(expr, tupSource);
@@ -158,59 +152,16 @@
         }
         DistinctOperator opDistinct = new DistinctOperator(exprList);
         opDistinct.getInputs().add(input);
-        return new Pair<ILogicalOperator, LogicalVariable>(opDistinct, null);
+        return new Pair<>(opDistinct, null);
     }
 
     @Override
     public Pair<ILogicalOperator, LogicalVariable> visit(UnionExpr unionExpr, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
-        List<Mutable<ILogicalOperator>> inputOpRefsToUnion = new ArrayList<>();
-        List<LogicalVariable> vars = new ArrayList<>();
-        for (Expression e : unionExpr.getExprs()) {
-            // Visits the expression of one branch.
-            Pair<ILogicalOperator, LogicalVariable> opAndVar = e.accept(this, tupSource);
-
-            // Creates an unnest operator.
-            LogicalVariable unnestVar = context.newVar();
-            List<Mutable<ILogicalExpression>> args = new ArrayList<>();
-            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVar.second)));
-            UnnestOperator unnestOp = new UnnestOperator(unnestVar,
-                    new MutableObject<ILogicalExpression>(new UnnestingFunctionCallExpression(
-                            FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), args)));
-            unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(opAndVar.first));
-            inputOpRefsToUnion.add(new MutableObject<ILogicalOperator>(unnestOp));
-            vars.add(unnestVar);
-        }
-
-        // Creates a tree of binary union-all operators.
-        UnionAllOperator topUnionAllOp = null;
-        LogicalVariable topUnionVar = null;
-        Iterator<Mutable<ILogicalOperator>> inputOpRefIterator = inputOpRefsToUnion.iterator();
-        Mutable<ILogicalOperator> leftInputBranch = inputOpRefIterator.next();
-        Iterator<LogicalVariable> inputVarIterator = vars.iterator();
-        LogicalVariable leftInputVar = inputVarIterator.next();
-
-        while (inputOpRefIterator.hasNext()) {
-            // Generates the variable triple <leftVar, rightVar, outputVar> .
-            topUnionVar = context.newVar();
-            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varTriple = new Triple<>(leftInputVar,
-                    inputVarIterator.next(), topUnionVar);
-            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = new ArrayList<>();
-            varTriples.add(varTriple);
-
-            // Creates a binary union-all operator.
-            topUnionAllOp = new UnionAllOperator(varTriples);
-            topUnionAllOp.getInputs().add(leftInputBranch);
-            topUnionAllOp.getInputs().add(inputOpRefIterator.next());
-
-            // Re-assigns leftInputBranch and leftInputVar.
-            leftInputBranch = new MutableObject<ILogicalOperator>(topUnionAllOp);
-            leftInputVar = topUnionVar;
-        }
-
-        Pair<ILogicalOperator, LogicalVariable> result = aggListifyForSubquery(topUnionVar,
-                new MutableObject<ILogicalOperator>(topUnionAllOp), false);
-        return result;
+        List<ILangExpression> inputExprs = new ArrayList<>();
+        inputExprs.addAll(unionExpr.getExprs());
+        Pair<ILogicalOperator, LogicalVariable> result = translateUnionAllFromInputExprs(inputExprs, tupSource);
+        return aggListifyForSubquery(result.second, new MutableObject<>(result.first), false);
     }
 
     private Pair<ILogicalOperator, LogicalVariable> produceFlworPlan(boolean noForClause, boolean isTop,
@@ -218,9 +169,9 @@
         if (isTop) {
             ProjectOperator pr = new ProjectOperator(resVar);
             pr.getInputs().add(resOpRef);
-            return new Pair<ILogicalOperator, LogicalVariable>(pr, resVar);
+            return new Pair<>(pr, resVar);
         } else if (noForClause) {
-            return new Pair<ILogicalOperator, LogicalVariable>(resOpRef.getValue(), resVar);
+            return new Pair<>(resOpRef.getValue(), resVar);
         } else {
             return aggListifyForSubquery(resVar, resOpRef, false);
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 136dd5e..5081587 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -39,6 +39,8 @@
 import org.apache.asterix.lang.aql.util.RangeMapBuilder;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Expression.Kind;
+import org.apache.asterix.lang.common.base.ILangExpression;
+import org.apache.asterix.lang.common.base.ILangExpression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
@@ -103,6 +105,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.Counter;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -141,6 +144,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
@@ -1588,4 +1592,53 @@
         return new MutableObject<>(
                 new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.AND), arguments));
     }
+
+    // Generates the plan for "UNION ALL" or union expression from its input expressions.
+    protected Pair<ILogicalOperator, LogicalVariable> translateUnionAllFromInputExprs(List<ILangExpression> inputExprs,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
+        List<Mutable<ILogicalOperator>> inputOpRefsToUnion = new ArrayList<>();
+        List<LogicalVariable> vars = new ArrayList<>();
+        for (ILangExpression expr : inputExprs) {
+            // Visits the expression of one branch.
+            Pair<ILogicalOperator, LogicalVariable> opAndVar = expr.accept(this, tupSource);
+
+            // Creates an unnest operator.
+            LogicalVariable unnestVar = context.newVar();
+            List<Mutable<ILogicalExpression>> args = new ArrayList<>();
+            args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVar.second)));
+            UnnestOperator unnestOp = new UnnestOperator(unnestVar,
+                    new MutableObject<ILogicalExpression>(new UnnestingFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), args)));
+            unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(opAndVar.first));
+            inputOpRefsToUnion.add(new MutableObject<ILogicalOperator>(unnestOp));
+            vars.add(unnestVar);
+        }
+
+        // Creates a tree of binary union-all operators.
+        UnionAllOperator topUnionAllOp = null;
+        LogicalVariable topUnionVar = null;
+        Iterator<Mutable<ILogicalOperator>> inputOpRefIterator = inputOpRefsToUnion.iterator();
+        Mutable<ILogicalOperator> leftInputBranch = inputOpRefIterator.next();
+        Iterator<LogicalVariable> inputVarIterator = vars.iterator();
+        LogicalVariable leftInputVar = inputVarIterator.next();
+
+        while (inputOpRefIterator.hasNext()) {
+            // Generates the variable triple <leftVar, rightVar, outputVar> .
+            topUnionVar = context.newVar();
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varTriple =
+                    new Triple<>(leftInputVar, inputVarIterator.next(), topUnionVar);
+            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = new ArrayList<>();
+            varTriples.add(varTriple);
+
+            // Creates a binary union-all operator.
+            topUnionAllOp = new UnionAllOperator(varTriples);
+            topUnionAllOp.getInputs().add(leftInputBranch);
+            topUnionAllOp.getInputs().add(inputOpRefIterator.next());
+
+            // Re-assigns leftInputBranch and leftInputVar.
+            leftInputBranch = new MutableObject<>(topUnionAllOp);
+            leftInputVar = topUnionVar;
+        }
+        return new Pair<>(topUnionAllOp, topUnionVar);
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index e40de4b..982a334 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.lang.common.base.Clause.ClauseType;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Expression.Kind;
+import org.apache.asterix.lang.common.base.ILangExpression;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.FieldBinding;
@@ -56,6 +57,9 @@
 import org.apache.asterix.lang.sqlpp.expression.IndependentSubquery;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.optype.JoinType;
+import org.apache.asterix.lang.sqlpp.optype.SetOpType;
+import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
+import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -181,13 +185,26 @@
     @Override
     public Pair<ILogicalOperator, LogicalVariable> visit(SelectSetOperation selectSetOperation,
             Mutable<ILogicalOperator> tupSource) throws AsterixException {
-        Mutable<ILogicalOperator> currentOpRef = tupSource;
-        Pair<ILogicalOperator, LogicalVariable> currentResult =
-                selectSetOperation.getLeftInput().accept(this, currentOpRef);
-        if (selectSetOperation.hasRightInputs()) {
-            throw new NotImplementedException();
+        SetOperationInput leftInput = selectSetOperation.getLeftInput();
+        if (!selectSetOperation.hasRightInputs()) {
+            return leftInput.accept(this, tupSource);
         }
-        return currentResult;
+        List<ILangExpression> inputExprs = new ArrayList<>();
+        inputExprs.add(leftInput.selectBlock()
+                ? new SelectExpression(null, new SelectSetOperation(leftInput, null), null, null, true)
+                : leftInput.getSubquery());
+        for (SetOperationRight setOperationRight : selectSetOperation.getRightInputs()) {
+            SetOpType setOpType = setOperationRight.getSetOpType();
+            if (setOpType != SetOpType.UNION || setOperationRight.isSetSemantics()) {
+                throw new AsterixException("Operation " + setOpType
+                        + (setOperationRight.isSetSemantics() ? "with set semantics" : "ALL") + " is not supported.");
+            }
+            SetOperationInput rightInput = setOperationRight.getSetOperationRightInput();
+            inputExprs.add(rightInput.selectBlock()
+                    ? new SelectExpression(null, new SelectSetOperation(rightInput, null), null, null, true)
+                    : rightInput.getSubquery());
+        }
+        return translateUnionAllFromInputExprs(inputExprs, tupSource);
     }
 
     @Override
@@ -314,7 +331,6 @@
             boolean hasRightPosVar = rightUnnestOp.getPositionalVariable() != null;
             if (hasRightPosVar) {
                 // Creates record to get correlation between the two aggregate variables.
-                @SuppressWarnings("unchecked")
                 ScalarFunctionCallExpression recordCreationFunc = new ScalarFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR),
                         // Field name for the listified right unnest var.
@@ -369,14 +385,11 @@
             currentTopOp = outerUnnestOp;
 
             if (hasRightPosVar) {
-                @SuppressWarnings("unchecked")
                 ScalarFunctionCallExpression fieldAccessForRightUnnestVar = new ScalarFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX),
                         new MutableObject<ILogicalExpression>(new VariableReferenceExpression(outerUnnestVar)),
                         new MutableObject<ILogicalExpression>(
                                 new ConstantExpression(new AsterixConstantValue(new AInt32(0)))));
-
-                @SuppressWarnings("unchecked")
                 ScalarFunctionCallExpression fieldAccessForRightPosVar = new ScalarFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX),
                         new MutableObject<ILogicalExpression>(new VariableReferenceExpression(outerUnnestVar)),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index d6864c1..cd088c1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory;
 import org.apache.asterix.dataflow.data.common.AqlMissableTypeComputer;
 import org.apache.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer;
+import org.apache.asterix.dataflow.data.common.ConflictingTypeResolver;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
 import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
@@ -63,6 +64,7 @@
 import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
@@ -151,10 +153,11 @@
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
-                PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
+                IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
+                AlgebricksPartitionConstraint clusterLocations) {
             return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
-                    physicalOptimizationConfig, clusterLocations);
+                    conflictingTypeResolver, physicalOptimizationConfig, clusterLocations);
         }
     }
 
@@ -257,6 +260,7 @@
         builder.setPartialAggregationTypeComputer(new AqlPartialAggregationTypeComputer());
         builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
         builder.setMissableTypeComputer(AqlMissableTypeComputer.INSTANCE);
+        builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE);
         builder.setClusterLocations(queryMetadataProvider.getClusterLocations());
 
         ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.ddl.aql
new file mode 100644
index 0000000..7ddf6b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.ddl.aql
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type LineItemType as closed {
+  l_orderkey: int32,
+  l_partkey: int32,
+  l_suppkey: int32,
+  l_linenumber: int32,
+  l_quantity: int32,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int32,
+  o_custkey: int32,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int32,
+  o_comment: string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey, l_linenumber;
+
+create dataset Orders(OrderType) primary key o_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.2.update.aql
new file mode 100644
index 0000000..fd005d4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse test;
+
+load dataset LineItem
+using localfs
+(("path"="asterix_nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|"));
+
+load dataset Orders
+using localfs
+(("path"="asterix_nc2://data/tpch0.001/orders.tbl"),("format"="delimited-text"),("delimiter"="|"));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.3.query.aql
new file mode 100644
index 0000000..4185b14
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.3.query.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse test;
+
+let $l := for $r in dataset LineItem where $r.l_orderkey = 2 return $r.l_linenumber
+let $o := for $s in dataset Orders where $s.o_orderkey =2 return $s.o_custkey
+let $c := $l union $o
+
+for $i in $c
+order by $i
+return $i;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.ddl.sqlpp
new file mode 100644
index 0000000..78c328b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.ddl.sqlpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type LineItemType as closed {
+  l_orderkey: int32,
+  l_partkey: int32,
+  l_suppkey: int32,
+  l_linenumber: int32,
+  l_quantity: int32,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int32,
+  o_custkey: int32,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int32,
+  o_comment: string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey, l_linenumber;
+
+create dataset Orders(OrderType) primary key o_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.2.update.sqlpp
new file mode 100644
index 0000000..26f63f9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset LineItem
+using localfs
+(("path"="asterix_nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|"));
+
+load dataset Orders
+using localfs
+(("path"="asterix_nc2://data/tpch0.001/orders.tbl"),("format"="delimited-text"),("delimiter"="|"));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.3.query.sqlpp
new file mode 100644
index 0000000..9e905f0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.3.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+SELECT *
+FROM LineItem l
+WHERE l_orderkey=2
+UNION ALL
+(
+ SELECT *
+ FROM Orders o
+ WHERE o_orderkey=2
+)
+ORDER BY l; // the second branch does not have a field "l" and hence it should be MISSING.
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union/union.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union/union.3.query.sqlpp
index 43bde4c..ecf2380 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union/union.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union/union.3.query.sqlpp
@@ -17,20 +17,16 @@
  * under the License.
  */
 
-use TinySocial;
+USE TinySocial;
 
 
-with  t1 as (
-      select element t.id
-      from  FacebookUsers as t
-  ),
-      t2 as (
-      select element s.`message-id`
-      from  FacebookMessages as s
-  ),
-      c as t1
-      union
-      t2
-select element res
-from  c as res
-;
+SELECT DISTINCT VALUE id
+FROM
+(
+ SELECT VALUE t.id
+ FROM FacebookUsers AS t
+ UNION ALL
+ SELECT VALUE s.`message-id`
+ FROM FacebookMessages AS s
+) id
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.ddl.sqlpp
new file mode 100644
index 0000000..ec91694
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.3.query.sqlpp
new file mode 100644
index 0000000..2d12187
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_heterogeneous_scalar/union_heterogeneous_scalar.3.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+
+SELECT DISTINCT VALUE id
+FROM
+(
+ SELECT VALUE t.id
+ FROM FacebookUsers AS t
+ UNION ALL
+ SELECT VALUE s.`message-id`
+ FROM FacebookMessages AS s
+ UNION ALL
+ SELECT VALUE t.name
+ FROM FacebookUsers AS t
+) id
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.1.ddl.sqlpp
new file mode 100644
index 0000000..6352aa9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.1.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.2.update.sqlpp
new file mode 100644
index 0000000..ae8e96d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.3.query.sqlpp
new file mode 100644
index 0000000..aa2de43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative/union_negative.3.query.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+
+SELECT DISTINCT VALUE id
+FROM
+(
+ SELECT VALUE t.id
+ FROM FacebookUsers AS t
+ UNION ALL
+ SELECT VALUE s.`message-id`
+ FROM t AS s // The reference to "t" here is not valid.
+) id
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.1.ddl.sqlpp
new file mode 100644
index 0000000..ec91694
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.1.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.3.query.sqlpp
new file mode 100644
index 0000000..bd68de8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_negative_2/union_negative_2.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+SELECT t.id id
+FROM FacebookUsers AS t
+UNION ALL
+SELECT s.`message-id` id
+FROM FacebookMessages AS s
+UNION ALL
+SELECT t.name id
+FROM FacebookUsers AS t
+ORDER BY t.id; // There is certainly no field called "t" in the union all results.
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.1.ddl.sqlpp
new file mode 100644
index 0000000..bdfed09
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.1.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookUsers2(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.3.query.sqlpp
new file mode 100644
index 0000000..7de4c96
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby/union_orderby.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+SELECT VALUE tmp
+FROM (
+ SELECT t.id id
+ FROM FacebookUsers AS t
+ UNION ALL
+ SELECT s.`message-id` id
+ FROM FacebookMessages AS s
+) tmp
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.1.ddl.sqlpp
new file mode 100644
index 0000000..bdfed09
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.1.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookUsers2(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.3.query.sqlpp
new file mode 100644
index 0000000..4936ba0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_2/union_orderby_2.3.query.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+SELECT t.id id
+FROM FacebookUsers AS t
+UNION ALL
+SELECT s.`message-id` id
+FROM FacebookMessages AS s
+UNION ALL
+(
+  SELECT t.name id
+  FROM FacebookUsers AS t
+)
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.1.ddl.sqlpp
new file mode 100644
index 0000000..bdfed09
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.1.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookUsers2(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.3.query.sqlpp
new file mode 100644
index 0000000..e65327c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_3/union_orderby_3.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+SELECT t.id id
+FROM FacebookUsers AS t
+UNION ALL
+SELECT s.`message-id` id
+FROM FacebookMessages AS s
+UNION ALL
+SELECT t.name id
+FROM FacebookUsers AS t
+UNION ALL
+SELECT foo id
+FROM [{"first-name":"a", "last-name":"b"}, ["c", "d"], [1234]] AS foo
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.1.ddl.sqlpp
new file mode 100644
index 0000000..bdfed09
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.1.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse TinySocial if exists;
+create  dataverse TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.FacebookUserType as
+{
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+{
+  `message-id` : int64
+}
+
+create  dataset FacebookUsers(FacebookUserType) primary key id;
+
+create  dataset FacebookUsers2(FacebookUserType) primary key id;
+
+create  dataset FacebookMessages(FacebookMessageType) primary key `message-id`;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.2.update.sqlpp
new file mode 100644
index 0000000..ccd33a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use TinySocial;
+
+
+load  dataset FacebookUsers using localfs ((`path`=`asterix_nc1://data/tinysocial/fbu.adm`),(`format`=`adm`));
+
+load  dataset FacebookMessages using localfs ((`path`=`asterix_nc1://data/tinysocial/fbm.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.3.query.sqlpp
new file mode 100644
index 0000000..c975fa4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_orderby_4/union_orderby_4.3.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE TinySocial;
+
+
+SELECT foo id
+FROM [{"first-name":"a", "last-name":"b"}, ["c", "d"], [1234]] AS foo
+UNION ALL
+SELECT t.id id
+FROM FacebookUsers AS t
+UNION ALL
+SELECT s.`message-id` id
+FROM FacebookMessages AS s
+UNION ALL
+SELECT t.name id
+FROM FacebookUsers AS t
+ORDER BY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.adm
new file mode 100644
index 0000000..3d494b8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354-2/query-ASTERIXDB-1354-2.1.adm
@@ -0,0 +1,2 @@
+{ "o": { "o_orderkey": 2, "o_custkey": 79, "o_orderstatus": "O", "o_totalprice": 40183.29, "o_orderdate": "1996-12-01", "o_orderpriority": "1-URGENT", "o_clerk": "Clerk#000000880", "o_shippriority": 0, "o_comment": " foxes. pending accounts at the pending, silent asymptot" } }
+{ "l": { "l_orderkey": 2, "l_partkey": 107, "l_suppkey": 2, "l_linenumber": 1, "l_quantity": 38, "l_extendedprice": 38269.8, "l_discount": 0.0, "l_tax": 0.05, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1997-01-28", "l_commitdate": "1997-01-14", "l_receiptdate": "1997-02-02", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": "ven requests. deposits breach a" } }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.adm
new file mode 100644
index 0000000..9f5fa37
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/query-ASTERIXDB-1354/query-ASTERIXDB-1354.1.adm
@@ -0,0 +1,2 @@
+1
+79
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.adm
new file mode 100644
index 0000000..2d5e9f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_heterogeneous_scalar/union_heterogeneous_scalar.1.adm
@@ -0,0 +1,25 @@
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+"BramHatch"
+"EmoryUnk"
+"IsbelDull"
+"MargaritaStoddard"
+"NicholasStroh"
+"NilaMilliron"
+"SuzannaTillson"
+"VonKemble"
+"WillisWynne"
+"WoodrowNehling"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby/union_orderby.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby/union_orderby.1.adm
new file mode 100644
index 0000000..58385a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby/union_orderby.1.adm
@@ -0,0 +1,25 @@
+{ "id": 1 }
+{ "id": 1 }
+{ "id": 2 }
+{ "id": 2 }
+{ "id": 3 }
+{ "id": 3 }
+{ "id": 4 }
+{ "id": 4 }
+{ "id": 5 }
+{ "id": 5 }
+{ "id": 6 }
+{ "id": 6 }
+{ "id": 7 }
+{ "id": 7 }
+{ "id": 8 }
+{ "id": 8 }
+{ "id": 9 }
+{ "id": 9 }
+{ "id": 10 }
+{ "id": 10 }
+{ "id": 11 }
+{ "id": 12 }
+{ "id": 13 }
+{ "id": 14 }
+{ "id": 15 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_2/union_orderby_2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_2/union_orderby_2.1.adm
new file mode 100644
index 0000000..6286610
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_2/union_orderby_2.1.adm
@@ -0,0 +1,35 @@
+{ "id": 1 }
+{ "id": 1 }
+{ "id": 2 }
+{ "id": 2 }
+{ "id": 3 }
+{ "id": 3 }
+{ "id": 4 }
+{ "id": 4 }
+{ "id": 5 }
+{ "id": 5 }
+{ "id": 6 }
+{ "id": 6 }
+{ "id": 7 }
+{ "id": 7 }
+{ "id": 8 }
+{ "id": 8 }
+{ "id": 9 }
+{ "id": 9 }
+{ "id": 10 }
+{ "id": 10 }
+{ "id": 11 }
+{ "id": 12 }
+{ "id": 13 }
+{ "id": 14 }
+{ "id": 15 }
+{ "id": "BramHatch" }
+{ "id": "EmoryUnk" }
+{ "id": "IsbelDull" }
+{ "id": "MargaritaStoddard" }
+{ "id": "NicholasStroh" }
+{ "id": "NilaMilliron" }
+{ "id": "SuzannaTillson" }
+{ "id": "VonKemble" }
+{ "id": "WillisWynne" }
+{ "id": "WoodrowNehling" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_3/union_orderby_3.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_3/union_orderby_3.1.adm
new file mode 100644
index 0000000..f3e8db2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_orderby_3/union_orderby_3.1.adm
@@ -0,0 +1,38 @@
+{ "id": 1 }
+{ "id": 1 }
+{ "id": 2 }
+{ "id": 2 }
+{ "id": 3 }
+{ "id": 3 }
+{ "id": 4 }
+{ "id": 4 }
+{ "id": 5 }
+{ "id": 5 }
+{ "id": 6 }
+{ "id": 6 }
+{ "id": 7 }
+{ "id": 7 }
+{ "id": 8 }
+{ "id": 8 }
+{ "id": 9 }
+{ "id": 9 }
+{ "id": 10 }
+{ "id": 10 }
+{ "id": 11 }
+{ "id": 12 }
+{ "id": 13 }
+{ "id": 14 }
+{ "id": 15 }
+{ "id": "BramHatch" }
+{ "id": "EmoryUnk" }
+{ "id": "IsbelDull" }
+{ "id": "MargaritaStoddard" }
+{ "id": "NicholasStroh" }
+{ "id": "NilaMilliron" }
+{ "id": "SuzannaTillson" }
+{ "id": "VonKemble" }
+{ "id": "WillisWynne" }
+{ "id": "WoodrowNehling" }
+{ "id": [ 1234 ] }
+{ "id": [ "c", "d" ] }
+{ "id": { "first-name": "a", "last-name": "b" } }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 9f98434..82f5071 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -380,6 +380,11 @@
         <output-dir compare="Text">query-ASTERIXDB-1047</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="query-ASTERIXDB-1354">
+        <output-dir compare="Text">query-ASTERIXDB-1354</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="aggregate">
     <test-case FilePath="aggregate">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9021994..ff00ea5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -75,15 +75,6 @@
       </compilation-unit>
     </test-case>
   </test-group>
-  <!--
-    <test-group name="union">
-        <test-case FilePath="union">
-            <compilation-unit name="union">
-                <output-dir compare="Text">union</output-dir>
-            </compilation-unit>
-        </test-case>
-    </test-group>
-    -->
   <test-case FilePath="flwor">
     <compilation-unit name="let33">
       <output-dir compare="Text">let33</output-dir>
@@ -7629,4 +7620,53 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="union">
+    <test-case FilePath="union">
+      <compilation-unit name="union">
+        <output-dir compare="Text">union</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_heterogeneous_scalar">
+        <output-dir compare="Text">union_heterogeneous_scalar</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_negative">
+        <output-dir compare="Text">union</output-dir>
+        <expected-error>Cannot find dataset t in dataverse TinySocial nor an alias with name t</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_negative_2">
+        <output-dir compare="Text">union</output-dir>
+        <expected-error>Undefined alias (variable) reference for identifier t</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_orderby">
+        <output-dir compare="Text">union_orderby</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_orderby_2">
+        <output-dir compare="Text">union_orderby_2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_orderby_3">
+        <output-dir compare="Text">union_orderby_3</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="union_orderby_4">
+        <output-dir compare="Text">union_orderby_3</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
+      <compilation-unit name="query-ASTERIXDB-1354-2">
+        <output-dir compare="Text">query-ASTERIXDB-1354-2</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppFunctionBodyRewriter.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppFunctionBodyRewriter.java
index 6ddfa40..43fa4a2 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppFunctionBodyRewriter.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppFunctionBodyRewriter.java
@@ -52,6 +52,9 @@
         // Group-by core/sugar rewrites.
         rewriteGroupBys();
 
+        // Rewrites set operations.
+        rewriteSetOperations();
+
         // Rewrites like/not-like expressions.
         rewriteOperatorExpression();
 
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
index c85097d..3f7cb31 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
@@ -54,6 +54,7 @@
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.InlineColumnAliasVisitor;
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.InlineWithExpressionVisitor;
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.OperatorExpressionVisitor;
+import org.apache.asterix.lang.sqlpp.rewrites.visitor.SetOperationVisitor;
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.SqlppBuiltinFunctionRewriteVisitor;
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.SqlppGlobalAggregationSugarVisitor;
 import org.apache.asterix.lang.sqlpp.rewrites.visitor.SqlppGroupByVisitor;
@@ -115,6 +116,9 @@
         // Group-by core/sugar rewrites.
         rewriteGroupBys();
 
+        // Rewrites set operations.
+        rewriteSetOperations();
+
         // Rewrites like/not-like expressions.
         rewriteOperatorExpression();
 
@@ -189,6 +193,15 @@
         substituteGbyExprVisitor.visit(topExpr, null);
     }
 
+    protected void rewriteSetOperations() throws AsterixException {
+        if (topExpr == null) {
+            return;
+        }
+        // Rewrites set operation queries that contain order-by and limit clauses.
+        SetOperationVisitor setOperationVisitor = new SetOperationVisitor(context);
+        setOperationVisitor.visit(topExpr, null);
+    }
+
     protected void rewriteOperatorExpression() throws AsterixException {
         if (topExpr == null) {
             return;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/InlineColumnAliasVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/InlineColumnAliasVisitor.java
index 4aa25d7..ea0aa86 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/InlineColumnAliasVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/InlineColumnAliasVisitor.java
@@ -274,17 +274,20 @@
             }
         }
 
-        // Visits selectSetOperation.
-        selectExpression.getSelectSetOperation().accept(this, overwriteWithGbyKeyVarRefs);
+        // Visits selectSetOperation
+        SelectSetOperation selectSetOperation = selectExpression.getSelectSetOperation();
+        selectSetOperation.accept(this, overwriteWithGbyKeyVarRefs);
 
-        // Visits order by.
-        if (selectExpression.hasOrderby()) {
-            selectExpression.getOrderbyClause().accept(this, overwriteWithGbyKeyVarRefs);
-        }
-
-        // Visits limit.
-        if (selectExpression.hasLimit()) {
-            selectExpression.getLimitClause().accept(this, overwriteWithGbyKeyVarRefs);
+        // If there is a UNION in the selectSetOperation, we cannot overwrite order by or limit.
+        if (!selectSetOperation.hasRightInputs()) {
+            // Visits order by.
+            if (selectExpression.hasOrderby()) {
+                selectExpression.getOrderbyClause().accept(this, overwriteWithGbyKeyVarRefs);
+            }
+            // Visits limit.
+            if (selectExpression.hasLimit()) {
+                selectExpression.getLimitClause().accept(this, overwriteWithGbyKeyVarRefs);
+            }
         }
 
         // Exits the scope that were entered within this select expression
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SetOperationVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SetOperationVisitor.java
new file mode 100644
index 0000000..66bfecd
--- /dev/null
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SetOperationVisitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.sqlpp.rewrites.visitor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.ILangExpression;
+import org.apache.asterix.lang.common.clause.LimitClause;
+import org.apache.asterix.lang.common.clause.OrderbyClause;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
+import org.apache.asterix.lang.sqlpp.clause.FromClause;
+import org.apache.asterix.lang.sqlpp.clause.FromTerm;
+import org.apache.asterix.lang.sqlpp.clause.SelectBlock;
+import org.apache.asterix.lang.sqlpp.clause.SelectClause;
+import org.apache.asterix.lang.sqlpp.clause.SelectElement;
+import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
+import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppExpressionScopingVisitor;
+
+/**
+ * This visitor rewrites set operation queries with order by and limit into
+ * a nested query where the set operation part is a subquery in the from clause.
+ * In this way, there is no special variable scoping mechanism that is needed
+ * for order by and limit clauses after the set operation.
+ */
+/*
+For example, the following query
+
+SELECT ... FROM ...
+UNION ALL
+SELECT ... FROM ...
+ORDER BY foo
+Limit 5;
+
+is rewritten into the following form:
+
+SELECT VALUE v
+FROM (
+   SELECT ... FROM ...
+   UNION ALL
+   SELECT ... FROM ...
+ ) AS v
+ORDER BY foo
+LIMIT 5;
+*/
+public class SetOperationVisitor extends AbstractSqlppExpressionScopingVisitor {
+
+    public SetOperationVisitor(LangRewritingContext context) {
+        super(context);
+    }
+
+    @Override
+    public Expression visit(SelectExpression selectExpression, ILangExpression arg) throws AsterixException {
+        // Recursively visit nested select expressions.
+        SelectSetOperation selectSetOperation = selectExpression.getSelectSetOperation();
+        if (!selectSetOperation.hasRightInputs() || !(selectExpression.hasOrderby() || selectExpression.hasLimit())) {
+            return super.visit(selectExpression, arg);
+        }
+        OrderbyClause orderBy = selectExpression.getOrderbyClause();
+        LimitClause limit = selectExpression.getLimitClause();
+
+        // Wraps the set operation part with a subquery.
+        SelectExpression nestedSelectExpression = new SelectExpression(null, selectSetOperation, null, null, true);
+        VariableExpr newBindingVar = new VariableExpr(context.newVariable()); // Binding variable for the subquery.
+        FromTerm newFromTerm = new FromTerm(nestedSelectExpression, newBindingVar, null, null);
+        FromClause newFromClause = new FromClause(new ArrayList<>(Collections.singletonList(newFromTerm)));
+        SelectClause selectClause = new SelectClause(new SelectElement(newBindingVar), null, false);
+        SelectBlock selectBlock = new SelectBlock(selectClause, newFromClause, null, null, null, null, null);
+        SelectSetOperation newSelectSetOperation =
+                new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
+
+        // Puts together the generated select-from-where query and order by/limit.
+        SelectExpression newSelectExpression = new SelectExpression(selectExpression.getLetList(),
+                newSelectSetOperation, orderBy, limit, selectExpression.isSubquery());
+        return super.visit(newSelectExpression, arg);
+    }
+
+}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupByVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupByVisitor.java
index 36463cb..6575752 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupByVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupByVisitor.java
@@ -106,7 +106,7 @@
             withVarSet.remove(selectBlock.getGroupbyClause().getGroupVar());
 
             Set<VariableExpr> allVisableVars = SqlppVariableUtil
-                    .getLiveUserDefinedVariables(scopeChecker.getCurrentScope());
+                    .getLiveVariables(scopeChecker.getCurrentScope());
             if (selectBlock.hasLetClausesAfterGroupby()) {
                 List<LetClause> letListAfterGby = selectBlock.getLetListAfterGroupby();
                 for (LetClause letClauseAfterGby : letListAfterGby) {
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
index d250e08..79c99b8 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
@@ -102,7 +102,7 @@
         if (!rewriteNeeded(varExpr)) {
             return varExpr;
         }
-        Set<VariableExpr> liveVars = SqlppVariableUtil.getLiveUserDefinedVariables(scopeChecker.getCurrentScope());
+        Set<VariableExpr> liveVars = SqlppVariableUtil.getLiveVariables(scopeChecker.getCurrentScope());
         boolean resolveAsDataset = resolveDatasetFirst(arg) && datasetExists(dataverseName, datasetName);
         if (resolveAsDataset) {
             return wrapWithDatasetFunction(dataverseName, datasetName);
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppVariableUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppVariableUtil.java
index a3e8447..b2a07e1 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppVariableUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppVariableUtil.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.lang.sqlpp.util;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -75,23 +77,16 @@
         return varName;
     }
 
-    public static Set<VariableExpr> getLiveUserDefinedVariables(Scope scope) {
+    public static Set<VariableExpr> getLiveVariables(Scope scope) {
         Set<VariableExpr> results = new HashSet<>();
         Set<VariableExpr> liveVars = scope.getLiveVariables();
         Iterator<VariableExpr> liveVarIter = liveVars.iterator();
         while (liveVarIter.hasNext()) {
-            VariableExpr var = liveVarIter.next();
-            if (SqlppVariableUtil.isUserDefinedVariable(var)) {
-                results.add(var);
-            }
+            results.add(liveVarIter.next());
         }
         return results;
     }
 
-    private static boolean isUserDefinedVariable(VariableExpr varExpr) {
-        return varExpr.getVar().getValue().startsWith(USER_VAR_PREFIX);
-    }
-
     public static String toInternalVariableName(String varName) {
         return USER_VAR_PREFIX + varName;
     }
@@ -108,10 +103,10 @@
     }
 
     public static Collection<VariableExpr> getBindingVariables(FromClause fromClause) {
-        Set<VariableExpr> bindingVars = new HashSet<>();
         if (fromClause == null) {
-            return bindingVars;
+            return Collections.emptyList();
         }
+        List<VariableExpr> bindingVars = new ArrayList<>();
         for (FromTerm fromTerm : fromClause.getFromTerms()) {
             bindingVars.addAll(getBindingVariables(fromTerm));
         }
@@ -119,7 +114,7 @@
     }
 
     public static Collection<VariableExpr> getBindingVariables(FromTerm fromTerm) {
-        Set<VariableExpr> bindingVars = new HashSet<>();
+        List<VariableExpr> bindingVars = new ArrayList<>();
         if (fromTerm == null) {
             return bindingVars;
         }
@@ -137,7 +132,7 @@
     }
 
     public static Collection<VariableExpr> getBindingVariables(GroupbyClause gbyClause) {
-        Set<VariableExpr> bindingVars = new HashSet<>();
+        List<VariableExpr> bindingVars = new ArrayList<>();
         if (gbyClause == null) {
             return bindingVars;
         }
@@ -159,7 +154,7 @@
     }
 
     public static Collection<VariableExpr> getBindingVariables(List<LetClause> letClauses) {
-        Set<VariableExpr> bindingVars = new HashSet<>();
+        List<VariableExpr> bindingVars = new ArrayList<>();
         if (letClauses == null || letClauses.isEmpty()) {
             return bindingVars;
         }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
index 4fbb697..13ec855 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
@@ -177,10 +177,22 @@
 
     @Override
     public Expression visit(SelectSetOperation selectSetOperation, ILangExpression arg) throws AsterixException {
+        Scope scopeBeforeCurrentBranch = scopeChecker.getCurrentScope();
+        scopeChecker.createNewScope();
         selectSetOperation.getLeftInput().accept(this, arg);
-        for (SetOperationRight right : selectSetOperation.getRightInputs()) {
-            scopeChecker.createNewScope();
-            right.getSetOperationRightInput().accept(this, arg);
+        if (selectSetOperation.hasRightInputs()) {
+            for (SetOperationRight right : selectSetOperation.getRightInputs()) {
+                // Exit scopes that were entered within a previous select expression
+                while (scopeChecker.getCurrentScope() != scopeBeforeCurrentBranch) {
+                    scopeChecker.removeCurrentScope();
+                }
+                scopeChecker.createNewScope();
+                right.getSetOperationRightInput().accept(this, arg);
+            }
+            // Exit scopes that were entered within the last branch of the set operation.
+            while (scopeChecker.getCurrentScope() != scopeBeforeCurrentBranch) {
+                scopeChecker.removeCurrentScope();
+            }
         }
         return null;
     }
@@ -274,7 +286,7 @@
         // variables defined in the parent scope.
         Scope scope = new Scope(scopeChecker, scopeChecker.getCurrentScope(), true);
         scopeChecker.pushExistingScope(scope);
-        independentSubquery.setExpr(independentSubquery.getExpr().accept(this, arg));
+        independentSubquery.setExpr(independentSubquery.getExpr().accept(this, independentSubquery));
         scopeChecker.removeCurrentScope();
         return independentSubquery;
     }
@@ -283,8 +295,8 @@
     public Expression visit(QuantifiedExpression qe, ILangExpression arg) throws AsterixException {
         scopeChecker.createNewScope();
         for (QuantifiedPair pair : qe.getQuantifiedList()) {
-            scopeChecker.getCurrentScope().addNewVarSymbolToScope(pair.getVarExpr().getVar());
             pair.setExpr(pair.getExpr().accept(this, qe));
+            scopeChecker.getCurrentScope().addNewVarSymbolToScope(pair.getVarExpr().getVar());
         }
         qe.setSatisfiesExpr(qe.getSatisfiesExpr().accept(this, qe));
         scopeChecker.removeCurrentScope();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/ConflictingTypeResolver.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/ConflictingTypeResolver.java
new file mode 100644
index 0000000..fda032c
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/ConflictingTypeResolver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
+
+/**
+ * The AsterixDB implementation for IConflictingTypeResolver.
+ */
+public class ConflictingTypeResolver implements IConflictingTypeResolver {
+
+    public static final ConflictingTypeResolver INSTANCE = new ConflictingTypeResolver();
+
+    private ConflictingTypeResolver() {
+    }
+
+    @Override
+    public Object resolve(Object... inputTypes) {
+        List<IAType> types = new ArrayList<>();
+        for (Object object : inputTypes) {
+            types.add((IAType) object);
+        }
+        return TypeResolverUtil.resolve(types);
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
new file mode 100644
index 0000000..6d66073
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+/**
+ * A common facility for resolving conflicting types.
+ * It is shared between the <code>ConflictingTypeResolver</code> and <code>SwitchCaseComputer</code>.
+ */
+public class TypeResolverUtil {
+
+    private TypeResolverUtil() {
+    }
+
+    /**
+     * Returns a minimally generalized type that conforms to all input types.
+     *
+     * @param inputTypes,
+     *            a list of input types
+     * @return a generalized type that conforms to all input types.
+     */
+    public static IAType resolve(List<IAType> inputTypes) {
+        IAType currentType = null;
+        for (IAType type : inputTypes) {
+            currentType = currentType == null ? type : generalizeTypes(currentType, type);
+        }
+        return currentType;
+    }
+
+    /**
+     * Decides whether a type cast is needed to covert data instances from the input type to the required type.
+     *
+     * @param reqType,
+     *            the required type.
+     * @param inputType,
+     *            the input type.
+     * @return true of a type cast is needed, false otherwise.
+     */
+    public static boolean needsCast(IAType reqType, IAType inputType) {
+        ATypeTag tag = inputType.getTypeTag();
+        // Gets the actual input type regardless of MISSING and NULL.
+        if (tag == ATypeTag.UNION) {
+            tag = ((AUnionType) inputType).getActualType().getTypeTag();
+        }
+        // Casts are only needed when the original return type is a complex type.
+        // (In the runtime, there is already a type tag for scalar types.)
+        if (tag != ATypeTag.RECORD && tag != ATypeTag.UNORDEREDLIST && tag != ATypeTag.ORDEREDLIST) {
+            return false;
+        }
+        return !TypeComputeUtils.getActualType(reqType).equals(TypeComputeUtils.getActualType(inputType));
+    }
+
+    // Generalizes two input types.
+    private static IAType generalizeTypes(IAType inputLeftType, IAType inputRightType) {
+        IAType leftType = inputLeftType;
+        IAType rightType = inputRightType;
+        ATypeTag leftTypeTag = leftType.getTypeTag();
+        ATypeTag rightTypeTag = rightType.getTypeTag();
+        boolean unknownable = false;
+
+        // Gets the actual types for UNIONs and mark unknownable to be true.
+        if (leftTypeTag == ATypeTag.UNION || rightTypeTag == ATypeTag.UNION) {
+            leftType = TypeComputeUtils.getActualType(leftType);
+            rightType = TypeComputeUtils.getActualType(leftType);
+            leftTypeTag = leftType.getTypeTag();
+            rightTypeTag = rightType.getTypeTag();
+            unknownable = true;
+        }
+        if (leftType.equals(rightType)) {
+            return unknownable ? AUnionType.createUnknownableType(leftType) : leftType;
+        }
+
+        // Deals with the case one input type is null or missing.
+        if (leftTypeTag == ATypeTag.MISSING || leftTypeTag == ATypeTag.NULL) {
+            return AUnionType.createUnknownableType(leftType);
+        }
+        if (rightTypeTag == ATypeTag.MISSING || rightTypeTag == ATypeTag.NULL) {
+            return AUnionType.createUnknownableType(leftType);
+        }
+
+        // If two input types have different type tags (UNION/NULL/MISSING have been excluded), we return ANY here.
+        if (leftTypeTag != rightTypeTag) {
+            return BuiltinType.ANY;
+        }
+
+        // If two input types have the same type tag but are not equal, they can only be complex types.
+        IAType generalizedComplexType = generalizeComplexTypes(leftTypeTag, leftType, rightType);
+        return unknownable ? AUnionType.createUnknownableType(generalizedComplexType) : generalizedComplexType;
+    }
+
+    // Generalizes two complex types, e.g., record, ordered list and unordered list.
+    private static IAType generalizeComplexTypes(ATypeTag typeTag, IAType leftType, IAType rightType) {
+        switch (typeTag) {
+            case RECORD:
+                return generalizeRecordTypes((ARecordType) leftType, (ARecordType) rightType);
+            case ORDEREDLIST:
+                return generalizeOrderedListTypes((AOrderedListType) leftType, (AOrderedListType) rightType);
+            case UNORDEREDLIST:
+                return generalizeUnorderedListTypes((AUnorderedListType) leftType, (AUnorderedListType) rightType);
+            default:
+                return BuiltinType.ANY;
+        }
+    }
+
+    // Generalizes two record types.
+    private static ARecordType generalizeRecordTypes(ARecordType leftType, ARecordType rightType) {
+        boolean knowsAdditonalFieldNames = true;
+        Set<String> allPossibleAdditionalFieldNames = new HashSet<>();
+        if (leftType.isOpen() && !leftType.knowsAllPossibleAdditonalFieldNames()) {
+            knowsAdditonalFieldNames = false;
+        } else if (leftType.isOpen()) {
+            allPossibleAdditionalFieldNames.addAll(leftType.getAllPossibleAdditonalFieldNames());
+        }
+        if (rightType.isOpen() && !rightType.knowsAllPossibleAdditonalFieldNames()) {
+            knowsAdditonalFieldNames = false;
+        } else if (rightType.isOpen()) {
+            allPossibleAdditionalFieldNames.addAll(rightType.getAllPossibleAdditonalFieldNames());
+        }
+        boolean canBeClosed = !leftType.isOpen() && !rightType.isOpen();
+        List<String> fieldNames = new ArrayList<>();
+        List<IAType> fieldTypes = new ArrayList<>();
+        boolean leftAllMatched =
+                generalizeRecordFields(leftType, rightType, allPossibleAdditionalFieldNames, fieldNames, fieldTypes);
+        boolean rightAllMatched =
+                generalizeRecordFields(rightType, leftType, allPossibleAdditionalFieldNames, fieldNames, fieldTypes);
+        return new ARecordType("generalized-record-type", fieldNames.toArray(new String[fieldNames.size()]),
+                fieldTypes.toArray(new IAType[fieldTypes.size()]), !(canBeClosed && leftAllMatched && rightAllMatched),
+                knowsAdditonalFieldNames ? allPossibleAdditionalFieldNames : null);
+    }
+
+    // Generates closed fields and possible additional fields of a generalized type of two record types.
+    private static boolean generalizeRecordFields(ARecordType leftType, ARecordType rightType,
+            Set<String> allPossibleAdditionalFieldNames, List<String> fieldNames, List<IAType> fieldTypes) {
+        boolean allMatched = true;
+        Set<String> existingFieldNames = new HashSet<>(fieldNames);
+        for (String fieldName : leftType.getFieldNames()) {
+            IAType leftFieldType = leftType.getFieldType(fieldName);
+            IAType rightFieldType = rightType.getFieldType(fieldName);
+            IAType generalizedFieldType =
+                    rightFieldType == null ? null : generalizeTypes(leftFieldType, rightFieldType);
+            if (generalizedFieldType == null || generalizedFieldType.equals(BuiltinType.ANY)) {
+                allPossibleAdditionalFieldNames.add(fieldName);
+                allMatched = false;
+            } else if (!existingFieldNames.contains(fieldName)) {
+                fieldNames.add(fieldName);
+                fieldTypes.add(generalizedFieldType);
+            }
+        }
+        return allMatched;
+    }
+
+    // Generalizes two ordered list types.
+    private static AOrderedListType generalizeOrderedListTypes(AOrderedListType leftType, AOrderedListType rightType) {
+        return new AOrderedListType(processItemType(generalizeTypes(leftType.getItemType(), rightType.getItemType())),
+                "generalized-ordered-list");
+    }
+
+    // Generalizes two unordered list types.
+    private static AUnorderedListType generalizeUnorderedListTypes(AUnorderedListType leftType,
+            AUnorderedListType rightType) {
+        return new AUnorderedListType(processItemType(generalizeTypes(leftType.getItemType(), rightType.getItemType())),
+                "generalized-unordered-list");
+    }
+
+    // A special processing for generalized item types in collections:
+    // a collection cannot maintain an item type of UNION. In this case, the item type has to be ANY.
+    private static IAType processItemType(IAType type) {
+        ATypeTag tag = type.getTypeTag();
+        return tag == ATypeTag.UNION ? BuiltinType.ANY : type;
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
index b551c88..00c4075 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
@@ -105,8 +105,13 @@
         }
         // set the pointer for result
         ATypeTag reqTypeTag = (arg.second).getTypeTag();
-        ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                .deserialize(accessor.getByteArray()[accessor.getStartOffset()]);
+        if (reqTypeTag == ATypeTag.ANY) {
+            // for open type case
+            arg.first.set(accessor);
+            return null;
+        }
+        ATypeTag inputTypeTag =
+                EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(accessor.getByteArray()[accessor.getStartOffset()]);
         if (!needPromote(inputTypeTag, reqTypeTag)) {
             arg.first.set(accessor);
         } else {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SwitchCaseComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SwitchCaseComputer.java
index 0eb60cb..22fe89f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SwitchCaseComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SwitchCaseComputer.java
@@ -18,10 +18,11 @@
  */
 package org.apache.asterix.om.typecomputer.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.common.TypeResolverUtil;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -45,31 +46,14 @@
         if (fce.getArguments().size() < 3) {
             throw new AlgebricksException(ERR_MSG);
         }
-
-        IAType currentType = null;
-        boolean any = false;
-        boolean unknownable = false;
         int argSize = fce.getArguments().size();
-        // Checks return types of different branches' return types.
+        List<IAType> types = new ArrayList<>();
+        // Collects different branches' return types.
         // The last return expression is from the ELSE branch and it is optional.
         for (int argIndex = 2; argIndex < argSize; argIndex += (argIndex + 2 == argSize) ? 1 : 2) {
             IAType type = (IAType) env.getType(fce.getArguments().get(argIndex).getValue());
-            ATypeTag typeTag = type.getTypeTag();
-            if (typeTag == ATypeTag.NULL || typeTag == ATypeTag.MISSING) {
-                unknownable = true;
-            } else {
-                if (typeTag == ATypeTag.UNION) {
-                    type = ((AUnionType) type).getActualType();
-                    unknownable = true;
-                }
-                if (currentType != null && !type.equals(currentType)) {
-                    any = true;
-                    break;
-                }
-                currentType = type;
-            }
+            types.add(type);
         }
-        currentType = currentType == null ? BuiltinType.ANULL : currentType;
-        return any ? BuiltinType.ANY : unknownable ? AUnionType.createUnknownableType(currentType) : currentType;
+        return TypeResolverUtil.resolve(types);
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index f2379ef..248ec3a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -44,8 +44,8 @@
  */
 public class ARecordType extends AbstractComplexType {
 
-    public static final ARecordType FULLY_OPEN_RECORD_TYPE = new ARecordType("OpenRecord", new String[0], new IAType[0],
-            true);
+    public static final ARecordType FULLY_OPEN_RECORD_TYPE =
+            new ARecordType("OpenRecord", new String[0], new IAType[0], true);
 
     private static final long serialVersionUID = 1L;
     private final String[] fieldNames;
@@ -116,6 +116,14 @@
         }
     }
 
+    public boolean knowsAllPossibleAdditonalFieldNames() {
+        return allPossibleAdditionalFieldNames != null;
+    }
+
+    public Set<String> getAllPossibleAdditonalFieldNames() {
+        return allPossibleAdditionalFieldNames;
+    }
+
     public String[] getFieldNames() {
         return fieldNames;
     }
diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
new file mode 100644
index 0000000..5303870
--- /dev/null
+++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/dataflow/data/common/TypeResolverUtilTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for TypeResolverUtil.
+ */
+public class TypeResolverUtilTest {
+
+    @Test
+    public void testRecordType() {
+        // Constructs input types.
+        ARecordType leftRecordType = new ARecordType(null, new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, false, null);
+        ARecordType rightRecordType = new ARecordType(null, new String[] { "b", "c" },
+                new IAType[] { BuiltinType.AINT32, BuiltinType.ABINARY }, false, null);
+
+        // Resolves input types to a generalized type.
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftRecordType);
+        inputTypes.add(rightRecordType);
+        ARecordType resolvedType = (ARecordType) TypeResolverUtil.resolve(inputTypes);
+
+        // Constructs the expected type.
+        Set<String> possibleAdditionalFields = new HashSet<>();
+        possibleAdditionalFields.add("a");
+        possibleAdditionalFields.add("c");
+        ARecordType expectedType = new ARecordType(null, new String[] { "b" }, new IAType[] { BuiltinType.AINT32 },
+                true, possibleAdditionalFields);
+
+        // Compares the resolved type with the expected type.
+        Assert.assertEquals(resolvedType, expectedType);
+        Assert.assertEquals(resolvedType.getAllPossibleAdditonalFieldNames(),
+                expectedType.getAllPossibleAdditonalFieldNames());
+    }
+
+    @Test
+    public void testIsmophicRecordType() {
+        // Constructs input types.
+        ARecordType leftRecordType = new ARecordType(null, new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, false, null);
+        ARecordType rightRecordType = new ARecordType(null, new String[] { "b", "a" },
+                new IAType[] { BuiltinType.AINT32, BuiltinType.ASTRING }, false, null);
+
+        // Resolves input types to a generalized type.
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftRecordType);
+        inputTypes.add(rightRecordType);
+        ARecordType resolvedType = (ARecordType) TypeResolverUtil.resolve(inputTypes);
+
+        // Compares the resolved type with the expected type.
+        Assert.assertEquals(resolvedType, leftRecordType);
+    }
+
+    @Test
+    public void testNestedRecordType() {
+        // Constructs input types.
+        ARecordType leftRecordType =
+                new ARecordType("null", new String[] { "a", "b" },
+                        new IAType[] { BuiltinType.ASTRING,
+                                new ARecordType(null, new String[] { "c", "d" },
+                                        new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, false, null) },
+                        false, null);
+        ARecordType rightRecordType =
+                new ARecordType("null", new String[] { "a", "b" },
+                        new IAType[] { BuiltinType.ASTRING,
+                                new ARecordType(null, new String[] { "d", "e" },
+                                        new IAType[] { BuiltinType.AINT32, BuiltinType.AINT32 }, false, null) },
+                        false, null);
+
+        // Resolves input types to a generalized type.
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftRecordType);
+        inputTypes.add(rightRecordType);
+        ARecordType resolvedType = (ARecordType) TypeResolverUtil.resolve(inputTypes);
+        ARecordType nestedRecordType = (ARecordType) resolvedType.getFieldType("b");
+
+        // Constructs the expected type.
+        Set<String> nestedPossibleAdditionalFields = new HashSet<>();
+        nestedPossibleAdditionalFields.add("c");
+        nestedPossibleAdditionalFields.add("e");
+        ARecordType expectedType =
+                new ARecordType(null, new String[] { "a", "b" },
+                        new IAType[] { BuiltinType.ASTRING, new ARecordType(null, new String[] { "d" },
+                                new IAType[] { BuiltinType.AINT32 }, true, nestedPossibleAdditionalFields) },
+                        false, null);
+
+        // Compares the resolved type with the expected type.
+        Assert.assertEquals(expectedType, resolvedType);
+        Assert.assertEquals(nestedRecordType.getAllPossibleAdditonalFieldNames(), nestedPossibleAdditionalFields);
+    }
+
+    @Test
+    public void testOrderedListType() {
+        // Constructs input types.
+        ARecordType leftRecordType = new ARecordType("null", new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, true, Collections.singleton("d"));
+        AOrderedListType leftListType = new AOrderedListType(leftRecordType, "null");
+        ARecordType rightRecordType = new ARecordType("null", new String[] { "b", "c" },
+                new IAType[] { BuiltinType.AINT32, BuiltinType.ABINARY }, true, Collections.singleton("e"));
+        AOrderedListType rightListType = new AOrderedListType(rightRecordType, "null");
+
+        // Gets the actual resolved type.
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftListType);
+        inputTypes.add(rightListType);
+        AbstractCollectionType resolvedType = (AbstractCollectionType) TypeResolverUtil.resolve(inputTypes);
+        ARecordType resolvedRecordType = (ARecordType) resolvedType.getItemType();
+
+        // Gets the expected generalized type.
+        Set<String> possibleAdditionalFields = new HashSet<>();
+        possibleAdditionalFields.add("a");
+        possibleAdditionalFields.add("c");
+        possibleAdditionalFields.add("d");
+        possibleAdditionalFields.add("e");
+        ARecordType expectedRecordType = new ARecordType(null, new String[] { "b" },
+                new IAType[] { BuiltinType.AINT32 }, true, possibleAdditionalFields);
+        AOrderedListType expectedListType = new AOrderedListType(expectedRecordType, null);
+
+        // Compares the resolved type and the expected type.
+        Assert.assertEquals(resolvedType, expectedListType);
+        Assert.assertEquals(resolvedRecordType.getAllPossibleAdditonalFieldNames(),
+                expectedRecordType.getAllPossibleAdditonalFieldNames());
+    }
+
+    @Test
+    public void testUnorderedListType() {
+        // Constructs input types.
+        ARecordType leftRecordType = new ARecordType(null, new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, true, Collections.singleton("d"));
+        AUnorderedListType leftListType = new AUnorderedListType(leftRecordType, null);
+        AUnorderedListType rightListType = new AUnorderedListType(BuiltinType.ASTRING, null);
+
+        // Gets the actual resolved type.
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftListType);
+        inputTypes.add(rightListType);
+        AbstractCollectionType resolvedType = (AbstractCollectionType) TypeResolverUtil.resolve(inputTypes);
+
+        // Compares the resolved type and the expected type.
+        Assert.assertEquals(resolvedType, new AUnorderedListType(BuiltinType.ANY, null));
+    }
+
+    @Test
+    public void testNullType() {
+        ARecordType leftRecordType = new ARecordType(null, new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, false, null);
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftRecordType);
+        inputTypes.add(BuiltinType.ANULL);
+        IAType resolvedType = TypeResolverUtil.resolve(inputTypes);
+        Assert.assertEquals(resolvedType, AUnionType.createUnknownableType(leftRecordType));
+    }
+
+    @Test
+    public void testMissingType() {
+        ARecordType leftRecordType = new ARecordType(null, new String[] { "a", "b" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.AINT32 }, false, null);
+        List<IAType> inputTypes = new ArrayList<>();
+        inputTypes.add(leftRecordType);
+        inputTypes.add(BuiltinType.AMISSING);
+        IAType resolvedType = TypeResolverUtil.resolve(inputTypes);
+        Assert.assertEquals(resolvedType, AUnionType.createUnknownableType(leftRecordType));
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
index 5c754cf..51b10dc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.pointables.cast.ACastVisitor;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
@@ -67,8 +68,9 @@
             throw new IllegalStateException(
                     "Invalid types for casting, required type " + reqType + ", input type " + inputType);
         }
-        this.reqType = reqType;
-        this.inputType = inputType;
+        // NULLs and MISSINGs are handled by the generated code, therefore we only need to handle actual types here.
+        this.reqType = TypeComputeUtils.getActualType(reqType);
+        this.inputType = TypeComputeUtils.getActualType(inputType);
     }
 
     @Override
@@ -107,8 +109,8 @@
             throws AlgebricksException {
         try {
             this.argEvaluator = argEvaluator;
-            this.inputPointable = allocateResultPointable(inputType, reqType);
-            this.resultPointable = allocateResultPointable(reqType, inputType);
+            this.inputPointable = allocatePointable(inputType, reqType);
+            this.resultPointable = allocatePointable(reqType, inputType);
             this.arg = new Triple<>(resultPointable, reqType, Boolean.FALSE);
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
@@ -128,7 +130,7 @@
     }
 
     // Allocates the result pointable.
-    private final IVisitablePointable allocateResultPointable(IAType typeForPointable, IAType typeForOtherSide)
+    private final IVisitablePointable allocatePointable(IAType typeForPointable, IAType typeForOtherSide)
             throws AsterixException {
         if (!typeForPointable.equals(BuiltinType.ANY)) {
             return allocator.allocateFieldValue(typeForPointable);
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index 6f28df3..deed53d 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
@@ -60,6 +61,7 @@
     protected IExpressionRuntimeProvider expressionRuntimeProvider;
     protected IExpressionTypeComputer expressionTypeComputer;
     protected IMissableTypeComputer missableTypeComputer;
+    protected IConflictingTypeResolver conflictingTypeResolver;
     protected IExpressionEvalSizeComputer expressionEvalSizeComputer;
     protected IMissingWriterFactory missingWriterFactory;
     protected INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
@@ -232,4 +234,12 @@
         return missableTypeComputer;
     }
 
+    public void setConflictingTypeResolver(IConflictingTypeResolver conflictingTypeResolver) {
+        this.conflictingTypeResolver = conflictingTypeResolver;
+    }
+
+    public IConflictingTypeResolver getConflictingTypeResolver() {
+        return conflictingTypeResolver;
+    }
+
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 09982a0..311aa43 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
@@ -52,11 +53,12 @@
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
-                PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
+                IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
+                AlgebricksPartitionConstraint clusterLocations) {
             LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor();
             return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
-                    physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
+                    conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
         }
     }
 
@@ -78,7 +80,7 @@
                     int varCounter) {
                 final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
                         expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                        missableTypeComputer, physicalOptimizationConfig, clusterLocations);
+                        missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations);
                 oc.setMetadataDeclarations(metadata);
                 final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
                 return new ICompiler() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IConflictingTypeResolver.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IConflictingTypeResolver.java
new file mode 100644
index 0000000..a6b5701
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IConflictingTypeResolver.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.expressions;
+
+/**
+ * This interface is in charge of resolving conflicting types and returning
+ * a resolved type that conform to all input types in the case there are conflicting
+ * types during type inference. For example, a UNION ALL operator can union a sequence
+ * of integers with a sequence of records, where the two input sequences have
+ * conflicting types.
+ */
+@FunctionalInterface
+public interface IConflictingTypeResolver {
+
+    /**
+     * Resolves conflicting input types and return a type that conforms to every input type.
+     *
+     * @param inputTypes,
+     *            conflicting types that need to be resolved.
+     * @return a (potentially relaxed) type that conform to all input types.
+     */
+    public Object resolve(Object... inputTypes);
+
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
index 865e781..c02eec3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
@@ -70,9 +69,7 @@
 
     @Override
     public void getUsedVariables(Collection<LogicalVariable> vars) {
-        // if (!vars.contains(variable)) {
         vars.add(variable);
-        // }
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
index 73ee9ad..9defb4f2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnionAllOperator.java
@@ -105,20 +105,31 @@
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
-                ctx.getMetadataProvider());
+        IVariableTypeEnvironment env =
+                new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());
         IVariableTypeEnvironment envLeft = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
+        IVariableTypeEnvironment envRight = ctx.getOutputTypeEnvironment(inputs.get(1).getValue());
         if (envLeft == null) {
             throw new AlgebricksException("Left input types for union operator are not computed.");
         }
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
-            Object t1 = envLeft.getVarType(t.first);
-            if (t1 == null) {
-                throw new AlgebricksException("Failed typing union operator: no type for variable " + t.first);
+            Object typeFromLeft = getType(envLeft, t.first);
+            Object typeFromRight = getType(envRight, t.second);
+            if (typeFromLeft.equals(typeFromRight)) {
+                env.setVarType(t.third, typeFromLeft);
+            } else {
+                env.setVarType(t.third, ctx.getConflictingTypeResolver().resolve(typeFromLeft, typeFromRight));
             }
-            env.setVarType(t.third, t1);
         }
         return env;
     }
 
+    // Gets the type of a variable from an type environment.
+    private Object getType(IVariableTypeEnvironment env, LogicalVariable var) throws AlgebricksException {
+        Object type = env.getVarType(var);
+        if (type == null) {
+            throw new AlgebricksException("Failed typing union operator: no type for variable " + var);
+        }
+        return type;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 3db6af9..7909499 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -19,9 +19,11 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -374,14 +376,17 @@
             return;
         }
         int size = variablesLeft.size();
+        // Keeps track of already matched right side variables.
+        Set<LogicalVariable> matchedRightVars = new HashSet<>();
         for (int i = 0; i < size; i++) {
             ILogicalExpression exprLeft = exprsLeft.get(i).getValue();
             LogicalVariable left = variablesLeft.get(i);
             for (int j = 0; j < size; j++) {
                 ILogicalExpression exprRight = copyExpressionAndSubtituteVars(exprsRight.get(j)).getValue();
-                if (exprLeft.equals(exprRight)) {
-                    LogicalVariable right = variablesRight.get(j);
+                LogicalVariable right = variablesRight.get(j);
+                if (exprLeft.equals(exprRight) && !matchedRightVars.contains(right)) {
                     variableMapping.put(right, left);
+                    matchedRightVars.add(right); // The added variable will not be considered in next rounds.
                     break;
                 }
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 7077014..ce8a704 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -129,11 +129,12 @@
                     public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
                             IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
                             ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
-                        if (firstDeliveredPartitioning != null && firstDeliveredPartitioning
+                        if (firstDeliveredPartitioning != null && requirements != null && firstDeliveredPartitioning
                                 .getPartitioningType() == requirements.getPartitioningType()) {
                             switch (requirements.getPartitioningType()) {
                                 case UNORDERED_PARTITIONED: {
-                                    UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
+                                    UnorderedPartitionedProperty upp1 =
+                                            (UnorderedPartitionedProperty) firstDeliveredPartitioning;
                                     Set<LogicalVariable> set1 = upp1.getColumnSet();
                                     UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
                                     Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
@@ -142,8 +143,8 @@
                                     Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
                                     List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
                                             ? keysRightBranch : keysLeftBranch;
-                                    List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
-                                            : keysRightBranch;
+                                    List<LogicalVariable> keysSecond =
+                                            keysFirst == keysRightBranch ? keysLeftBranch : keysRightBranch;
                                     for (LogicalVariable r : uppreq.getColumnSet()) {
                                         EquivalenceClass ecSnd = eqmap.get(r);
                                         boolean found = false;
@@ -177,8 +178,8 @@
                                                 + " to agree with partitioning property " + firstDeliveredPartitioning
                                                 + " delivered by previous input operator.");
                                     }
-                                    UnorderedPartitionedProperty upp2 = new UnorderedPartitionedProperty(modifuppreq,
-                                            requirements.getNodeDomain());
+                                    UnorderedPartitionedProperty upp2 =
+                                            new UnorderedPartitionedProperty(modifuppreq, requirements.getNodeDomain());
                                     return new Pair<Boolean, IPartitioningProperty>(false, upp2);
                                 }
                                 case ORDERED_PARTITIONED: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 823294e..d70c67d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -29,9 +30,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
@@ -56,7 +58,6 @@
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
         this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>(0));
-
     }
 
     @Override
@@ -65,7 +66,8 @@
         StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 },
-                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+                (requirements, firstDeliveredPartitioning, operator, ctx) -> this.coordinateRequirements(requirements,
+                        firstDeliveredPartitioning));
     }
 
     @Override
@@ -74,7 +76,8 @@
             throws AlgebricksException {
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor recordDescriptor =
+                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
 
         // at algebricks level, union all only accepts two inputs, although at
         // hyracks
@@ -91,4 +94,21 @@
     public boolean expensiveThanMaterialization() {
         return false;
     }
+
+    // This method implements how inputs' partitioning properties are coordinated.
+    // The partitioning property of the first input branch is kept unchanged.
+    // A random partitioning property is required for the second branch and the node domain of the first input branch
+    // will be used.
+    private Pair<Boolean, IPartitioningProperty> coordinateRequirements(IPartitioningProperty requirements,
+            IPartitioningProperty firstDeliveredPartitioning) throws AlgebricksException {
+        if (firstDeliveredPartitioning == null) {
+            return new Pair<>(true, requirements);
+        }
+        PartitioningType partType = firstDeliveredPartitioning.getPartitioningType();
+        if (partType == PartitioningType.UNPARTITIONED) {
+            return new Pair<>(true, firstDeliveredPartitioning);
+        } else {
+            return new Pair<>(true, new RandomPartitioningProperty(firstDeliveredPartitioning.getNodeDomain()));
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
index 1e5e205..f4f5d7f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
@@ -52,7 +52,7 @@
         public Pair<Boolean, IPartitioningProperty> coordinateRequirements(IPartitioningProperty rqdpp,
                 IPartitioningProperty firstDeliveredPartitioning, ILogicalOperator op, IOptimizationContext context)
                 throws AlgebricksException {
-            if (firstDeliveredPartitioning != null
+            if (firstDeliveredPartitioning != null && rqdpp != null
                     && firstDeliveredPartitioning.getPartitioningType() == rqdpp.getPartitioningType()) {
                 switch (rqdpp.getPartitioningType()) {
                     case UNORDERED_PARTITIONED: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
index 7b77083..fad6b59 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
@@ -20,24 +20,74 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 
+/**
+ * Implementations of this interface are supposed to be in charge of type inferences
+ * during query compilations.
+ */
 public interface ITypingContext {
+
+    /**
+     * Gets the type environment from the output perspective of the argument operator.
+     *
+     * @param op,
+     *            the operator of interests.
+     * @return the type environment after the operator's processing.
+     */
     public IVariableTypeEnvironment getOutputTypeEnvironment(ILogicalOperator op);
 
+    /**
+     * Sets the output type environment of an operator.
+     *
+     * @param op,
+     *            the operator of interests.
+     * @param env,
+     *            the type environment after the operator's processing.
+     */
     public void setOutputTypeEnvironment(ILogicalOperator op, IVariableTypeEnvironment env);
 
+    /**
+     * @return the type computer for expressions.
+     */
     public IExpressionTypeComputer getExpressionTypeComputer();
 
+    /**
+     * @return a type computer for "missable" types, e.g.,
+     *         the resulting types for variables populated from the right input branch of
+     *         a left outer join.
+     */
     public IMissableTypeComputer getMissableTypeComputer();
 
+    /**
+     * @return a resolver for conflicting types.
+     */
+    public IConflictingTypeResolver getConflictingTypeResolver();
+
+    /**
+     * @return the metadata provider, which is in charge of metadata reads/writes.
+     */
     public IMetadataProvider<?, ?> getMetadataProvider();
 
+    /**
+     * Invalidates a type environment for an operator.
+     *
+     * @param op,
+     *            the operator of interests.
+     */
     public void invalidateTypeEnvironmentForOperator(ILogicalOperator op);
 
+    /**
+     * (Re-)computes and sets a type environment for an operator.
+     *
+     * @param op
+     *            the operator of interests.
+     * @throws AlgebricksException
+     */
     public void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 86f61ad..a1b3556 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
@@ -43,6 +44,9 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
+/**
+ * The Algebricks default implementation for IOptimizationContext.
+ */
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class AlgebricksOptimizationContext implements IOptimizationContext {
 
@@ -52,7 +56,7 @@
     private final PhysicalOptimizationConfig physicalOptimizationConfig;
     private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
 
-        Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>();
+        Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
 
         @Override
         public void setVariableEvalSize(LogicalVariable var, int size) {
@@ -65,38 +69,40 @@
         }
     };
 
-    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>();
+    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
 
-    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
-    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
-    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
+    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
+    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
 
     private IMetadataProvider metadataProvider;
-    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
+    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
 
-    protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>();
-    protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
+    protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>();
+    protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>();
 
-    protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
+    protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>();
     private final IExpressionTypeComputer expressionTypeComputer;
     private final IMissableTypeComputer nullableTypeComputer;
     private final INodeDomain defaultNodeDomain;
     private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
+    private final IConflictingTypeResolver conflictingTypeResovler;
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
-            IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
+            IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
+            IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
+            AlgebricksPartitionConstraint clusterLocations) {
         this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                nullableTypeComputer, physicalOptimizationConfig, clusterLocations,
+                missableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, clusterLocations,
                 new LogicalOperatorPrettyPrintVisitor());
     }
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations,
-            LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+            IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
+            AlgebricksPartitionConstraint clusterLocations, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
         this.varCounter = varCounter;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -105,6 +111,7 @@
         this.physicalOptimizationConfig = physicalOptimizationConfig;
         this.defaultNodeDomain = new DefaultNodeGroupDomain(clusterLocations);
         this.prettyPrintVisitor = prettyPrintVisitor;
+        this.conflictingTypeResovler = conflictingTypeResovler;
     }
 
     @Override
@@ -120,8 +127,7 @@
     @Override
     public LogicalVariable newVar() {
         varCounter++;
-        LogicalVariable var = new LogicalVariable(varCounter);
-        return var;
+        return new LogicalVariable(varCounter);
     }
 
     @Override
@@ -148,7 +154,7 @@
     public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
-            HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>();
+            HashSet<ILogicalOperator> os = new HashSet<>();
             os.add(op);
             dontApply.put(rule, os);
         } else {
@@ -164,7 +170,7 @@
     public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
         HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
         if (ops == null) {
-            HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>();
+            HashSet<ILogicalOperator> newEntry = new HashSet<>();
             newEntry.add(op2);
             alreadyCompared.put(op1, newEntry);
             return false;
@@ -203,10 +209,7 @@
     @Override
     public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
         FunctionalDependency fd = varToPrimaryKey.get(recordVar);
-        if (fd == null) {
-            return null;
-        }
-        return new ArrayList<LogicalVariable>(fd.getHead());
+        return fd == null ? null : new ArrayList<>(fd.getHead());
     }
 
     @Override
@@ -299,7 +302,7 @@
     public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
         for (Map.Entry<LogicalVariable, FunctionalDependency> me : varToPrimaryKey.entrySet()) {
             FunctionalDependency fd = me.getValue();
-            List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
+            List<LogicalVariable> hd = new ArrayList<>();
             for (LogicalVariable v : fd.getHead()) {
                 LogicalVariable v2 = mappedVars.get(v);
                 if (v2 == null) {
@@ -308,7 +311,7 @@
                     hd.add(v2);
                 }
             }
-            List<LogicalVariable> tl = new ArrayList<LogicalVariable>();
+            List<LogicalVariable> tl = new ArrayList<>();
             for (LogicalVariable v : fd.getTail()) {
                 LogicalVariable v2 = mappedVars.get(v);
                 if (v2 == null) {
@@ -330,4 +333,9 @@
     public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
         return prettyPrintVisitor;
     }
+
+    @Override
+    public IConflictingTypeResolver getConflictingTypeResolver() {
+        return conflictingTypeResovler;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 0e5cf9c..6ceed1e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -20,6 +20,7 @@
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
@@ -30,5 +31,6 @@
             IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations);
+            IConflictingTypeResolver conflictintTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
+            AlgebricksPartitionConstraint clusterLocations);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 35d16a9..d07544b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -68,8 +68,10 @@
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
@@ -81,8 +83,6 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -124,9 +124,9 @@
 
         PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context);
 
-        StructuralPropertiesVector pvector = new StructuralPropertiesVector(
-                new RandomPartitioningProperty(context.getComputationNodeDomain()),
-                new LinkedList<ILocalStructuralProperty>());
+        StructuralPropertiesVector pvector =
+                new StructuralPropertiesVector(new RandomPartitioningProperty(context.getComputationNodeDomain()),
+                        new LinkedList<ILocalStructuralProperty>());
         boolean changed = physOptimizeOp(opRef, pvector, false, context);
         op.computeDeliveredPhysicalProperties(context);
         AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + op.getPhysicalOperator() + ": "
@@ -162,7 +162,7 @@
             reqdProperties = pr.getRequiredProperties();
         }
 
-        List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>();
+        List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
         for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
             AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
             deliveredPartitioningPropertiesFromChildren
@@ -253,8 +253,8 @@
             Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
                     requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context);
             boolean mayExpandPartitioningProperties = pbpp.first;
-            IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
-                    requiredProperty.getLocalProperties());
+            IPhysicalPropertiesVector rqd =
+                    new StructuralPropertiesVector(pbpp.second, requiredProperty.getLocalProperties());
 
             AlgebricksConfig.ALGEBRICKS_LOGGER
                     .finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n");
@@ -271,13 +271,12 @@
                 changed = true;
                 addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
 
-                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex)
-                        .getValue());
+                AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
 
                 if (newChild != child) {
                     delivered = newChild.getDeliveredPhysicalProperties();
-                    IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, rqd,
-                            mayExpandPartitioningProperties, context);
+                    IPhysicalPropertiesVector newDiff =
+                            newPropertiesDiff(newChild, rqd, mayExpandPartitioningProperties, context);
                     AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");
 
                     if (isRedundantSort(opRef, delivered, newDiff, context)) {
@@ -288,11 +287,7 @@
             }
 
             if (firstDeliveredPartitioning == null) {
-                IPartitioningProperty dpp = delivered.getPartitioningProperty();
-                if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
-                        || dpp.getPartitioningType() == PartitioningType.UNORDERED_PARTITIONED) {
-                    firstDeliveredPartitioning = dpp;
-                }
+                firstDeliveredPartitioning = delivered.getPartitioningProperty();
             }
         }
 
@@ -437,8 +432,8 @@
         IPartitioningProperty pp = diffPropertiesVector.getPartitioningProperty();
         if (pp == null || pp.getPartitioningType() == PartitioningType.UNPARTITIONED) {
             addLocalEnforcers(op, childIndex, diffPropertiesVector.getLocalProperties(), nestedPlan, context);
-            IPhysicalPropertiesVector deliveredByNewChild = ((AbstractLogicalOperator) op.getInputs().get(0).getValue())
-                    .getDeliveredPhysicalProperties();
+            IPhysicalPropertiesVector deliveredByNewChild =
+                    ((AbstractLogicalOperator) op.getInputs().get(0).getValue()).getDeliveredPhysicalProperties();
             addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
         } else {
             addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, pp.getNodeDomain(), context);
@@ -474,8 +469,8 @@
                 }
                 case LOCAL_GROUPING_PROPERTY: {
                     LocalGroupingProperty g = (LocalGroupingProperty) prop;
-                    Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null)
-                            ? g.getPreferredOrderEnforcer() : g.getColumnSet();
+                    Collection<LogicalVariable> vars =
+                            (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet();
                     List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
                     for (LogicalVariable v : vars) {
                         OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
@@ -502,7 +497,7 @@
     private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
             Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
             throws AlgebricksException {
-        List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<>();
         for (LocalOrderProperty orderProperty : oList) {
             for (OrderColumn oc : orderProperty.getOrderColumns()) {
                 IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
@@ -538,8 +533,8 @@
                         pop = new RandomMergeExchangePOperator();
                     } else {
                         if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
-                            IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
-                                    .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
+                            IRangeMap rangeMap =
+                                    (IRangeMap) op.getAnnotations().get(OperatorAnnotations.USE_RANGE_CONNECTOR);
                             pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
                         } else {
                             OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
@@ -550,8 +545,7 @@
                     break;
                 }
                 case UNORDERED_PARTITIONED: {
-                    List<LogicalVariable> varList = new ArrayList<LogicalVariable>(
-                            ((UnorderedPartitionedProperty) pp).getColumnSet());
+                    List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
                     List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
                     List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
                     boolean propWasSet = false;
@@ -561,8 +555,8 @@
                         Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c);
                         List<FunctionalDependency> fds = context.getFDList(c);
                         if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
-                            List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals,
-                                    cldLocals);
+                            List<OrderColumn> orderColumns =
+                                    getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
                             pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain);
                             propWasSet = true;
                         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
index 06f5c35..2772d8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
@@ -23,7 +23,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -42,34 +41,24 @@
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        return false;
-    }
-
-    /**
-     * When the input schema to WriteOperator is different from the output
-     * schema in terms of variable order, add a project operator to get the
-     * write order
-     */
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
             return false;
         }
         UnionAllOperator opUnion = (UnionAllOperator) op;
         List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = opUnion.getVariableMappings();
-        ArrayList<LogicalVariable> usedVariablesFromOne = new ArrayList<LogicalVariable>();
-        ArrayList<LogicalVariable> usedVariablesFromTwo = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> usedVariablesFromOne = new ArrayList<>();
+        ArrayList<LogicalVariable> usedVariablesFromTwo = new ArrayList<>();
 
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varMap) {
             usedVariablesFromOne.add(triple.first);
             usedVariablesFromTwo.add(triple.second);
         }
 
-        ArrayList<LogicalVariable> inputSchemaOne = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> inputSchemaOne = new ArrayList<>();
         VariableUtilities.getLiveVariables(opUnion.getInputs().get(0).getValue(), inputSchemaOne);
 
-        ArrayList<LogicalVariable> inputSchemaTwo = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> inputSchemaTwo = new ArrayList<>();
         VariableUtilities.getLiveVariables(opUnion.getInputs().get(1).getValue(), inputSchemaTwo);
 
         boolean rewritten = false;
@@ -105,8 +94,9 @@
         for (int i = 0; i < finalSchemaSize; i++) {
             LogicalVariable var1 = finalSchema.get(i);
             LogicalVariable var2 = inputSchema.get(i);
-            if (!var1.equals(var2))
+            if (!var1.equals(var2)) {
                 return false;
+            }
         }
         return true;
     }