[ASTERIXDB-2244][RT] Implement micro union-all operator

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- implement support for binary micro operators in subplans
- implement micro union-all operator
- fix free variables visitor

Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2277
Reviewed-by: Till Westmann <tillw@apache.org>
Contrib: Till Westmann <tillw@apache.org>
Integration-Tests: Till Westmann <tillw@apache.org>
Tested-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 5b6285a..d277043 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -67,6 +67,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
@@ -200,11 +201,11 @@
                     break;
                 }
                 case INNERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context);
                     break;
                 }
                 case LEFTOUTERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context);
                     break;
                 }
                 case LIMIT: {
@@ -259,11 +260,19 @@
                     break;
                 }
                 case UNIONALL: {
-                    op.setPhysicalOperator(new UnionAllPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new UnionAllPOperator());
+                    } else {
+                        op.setPhysicalOperator(new MicroUnionAllPOperator());
+                    }
                     break;
                 }
                 case INTERSECT: {
-                    op.setPhysicalOperator(new IntersectPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new IntersectPOperator());
+                    } else {
+                        throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
+                    }
                     break;
                 }
                 case UNNEST: {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 9d3b311..3efa46b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -139,6 +139,10 @@
                 while (upperSubplanRootRefIterator.hasNext()) {
                     Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next();
 
+                    if (downToNts(rootOpRef) == null) {
+                        continue;
+                    }
+
                     // Collects free variables in the root operator of a nested plan and its descent.
                     Set<LogicalVariable> freeVars = new ListSet<>();
                     OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(),
@@ -154,6 +158,9 @@
                             // Sets the nts for a original subplan.
                             Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex);
                             Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
+                            if (originalGbyNtsRef == null) {
+                                continue;
+                            }
                             NestedTupleSourceOperator originalNts =
                                     (NestedTupleSourceOperator) originalGbyNtsRef.getValue();
                             originalNts.setDataSourceReference(new MutableObject<>(gby));
@@ -265,11 +272,13 @@
     }
 
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {
-        Mutable<ILogicalOperator> currentOpRef = opRef;
-        while (currentOpRef.getValue().getInputs().size() > 0) {
-            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        List<Mutable<ILogicalOperator>> leafOps = OperatorManipulationUtil.findLeafDescendantsOrSelf(opRef);
+        if (leafOps.size() == 1) {
+            Mutable<ILogicalOperator> leafOp = leafOps.get(0);
+            if (leafOp.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                return leafOp;
+            }
         }
-        return currentOpRef;
+        return null;
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 6efda52..0bc2a5e 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -51,8 +51,11 @@
     private JoinUtils() {
     }
 
-    public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
-            throws AlgebricksException {
+    public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean topLevelOp,
+            IOptimizationContext context) throws AlgebricksException {
+        if (!topLevelOp) {
+            throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
+        }
         List<LogicalVariable> sideLeft = new LinkedList<>();
         List<LogicalVariable> sideRight = new LinkedList<>();
         List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();