[ASTERIXDB-3579][COMP] Unnest as many Unnest Operators as possible

Ext-ref:MB-65823

Change-Id: Ic1c305b6795ae312ff4d61948b32fb8a0974e7ba
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19526
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: <murali.krishna@couchbase.com>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: <preethampoluparthi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 7a3b299..148ace9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -91,6 +91,7 @@
 
     // for the Array UNNEST optimization. The main list is for each leafInput.
     private List<List<List<ILogicalOperator>>> unnestOpsInfo;
+    private boolean arrayUnnestPossible = false;
     // The Distinct operators for each DataSourceScan operator (if applicable)
     private HashMap<DataSourceScanOperator, ILogicalOperator> dataScanAndGroupByDistinctOps;
 
@@ -102,7 +103,6 @@
 
     private List<LogicalVariable> resultAndJoinVars = new ArrayList();
     private final List<Boolean> realLeafInputs = new ArrayList();
-    private boolean arrayUnnestPossible = true;
     private int numberOfFromTerms;
 
     private List<Triple<ILogicalOperator, ILogicalOperator, List<ILogicalOperator>>> modifyUnnestInfo;
@@ -524,60 +524,84 @@
 
     private void findUnnestOps(ILogicalOperator leafInput) throws AlgebricksException {
         ILogicalOperator p = leafInput;
+        realLeafInputs.add(true); // every leafInput wil be real
         List<ILogicalOperator> unnestOps = findAllUnnestOps(p); // how many and which ones
+        if (unnestOps.isEmpty()) {
+            unnestOpsInfo.add(new ArrayList<>());
+            return;
+        }
+        List<List<ILogicalOperator>> bigList = new ArrayList<>();
+        int count = 0;
         for (ILogicalOperator op : unnestOps) {
             UnnestOperator unnestOp = (UnnestOperator) op;
             if (anyVarIsAJoinVar(unnestOp.getVariables())) {
                 unnestOpsInfo.add(new ArrayList<>()); // each leafInput must have one entry
-                arrayUnnestPossible = false; // If these variables participate in join predicates, then unnestOps cannot be moved above joins
+                //not possible to unnest this unnest op. If these variables participate in join predicates, then unnestOps cannot be moved above joins
+                continue;
+            }
+
+            Pair<Boolean, List<ILogicalOperator>> info = findAllAssociatedAssingOps(p, unnestOp);
+            if (!info.first) {// something 'bad' happened, so this unnestOp cannot be unnested
+                unnestOpsInfo.add(new ArrayList<>()); // each leafInput must have one entry
+                //not possible to unnest this unnest op. If these variables participate in join predicates, then unnestOps cannot be moved above joins
+                continue;
+            }
+            count++; // found something unnestable
+            bigList.add(info.second);
+        }
+        if (count > 0) {
+            arrayUnnestPossible = true;
+            unnestOpsInfo.add(bigList);
+            for (int i = 0; i < count; i++) {
+                unnestOpsInfo.add(new ArrayList<>()); // this is for the fake leaf Input that will be added for every unnestOp
+                realLeafInputs.add(false);
             }
         }
-        List<List<ILogicalOperator>> bigList = new ArrayList<>();
-        realLeafInputs.add(true);
-        for (int i = 0; i < unnestOps.size(); i++) {
-            List<ILogicalOperator> ops = new ArrayList<>(); //Gather all AssignsOps, if any, associated wth this unnestOp
-            UnnestOperator unnestOp = (UnnestOperator) unnestOps.get(i);
+    }
 
-            while (p != null && p.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
-                if (p.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-                    AssignOperator aOp = (AssignOperator) p;
+    Pair<Boolean, List<ILogicalOperator>> findAllAssociatedAssingOps(ILogicalOperator leafInput,
+            UnnestOperator unnestOp) throws AlgebricksException {
+        Pair<Boolean, List<ILogicalOperator>> info = new Pair<>(true, new ArrayList<>());
+        ILogicalOperator p = leafInput;
 
-                    ILogicalExpression a = aOp.getExpressions().get(0).getValue();
-                    if (a.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                        AbstractFunctionCallExpression exp =
-                                (AbstractFunctionCallExpression) aOp.getExpressions().get(0).getValue();
-                        if (exp.getKind() == AbstractFunctionCallExpression.FunctionKind.SCALAR) {
-                            ILogicalExpression lexp = exp.getArguments().get(0).getValue();
-                            if (lexp.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                VariableReferenceExpression varRef = (VariableReferenceExpression) lexp;
-                                LogicalVariable var = varRef.getVariableReference();
-                                LogicalVariable unnestVar = unnestOp.getVariables().get(0);
-                                if (unnestVar == var) {
-                                    if ((anyVarIsAJoinVar(aOp.getVariables())
-                                            || assignVarPresentInLeafInput(aOp, leafInput))) {
-                                        unnestOpsInfo.add(new ArrayList<>());
-                                        arrayUnnestPossible = false;
-                                    } else {
-                                        ops.add(aOp);
-                                    }
+        List<ILogicalOperator> ops = new ArrayList<>(); //Gather all AssignsOps, if any, associated wth this unnestOp
+        while (p != null && p.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+            if (p.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                AssignOperator aOp = (AssignOperator) p;
+
+                ILogicalExpression a = aOp.getExpressions().get(0).getValue();
+                if (a.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression exp =
+                            (AbstractFunctionCallExpression) aOp.getExpressions().get(0).getValue();
+                    if (exp.getKind() == AbstractFunctionCallExpression.FunctionKind.SCALAR) {
+                        ILogicalExpression lexp = exp.getArguments().get(0).getValue();
+                        if (lexp.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                            VariableReferenceExpression varRef = (VariableReferenceExpression) lexp;
+                            LogicalVariable var = varRef.getVariableReference();
+                            LogicalVariable unnestVar = unnestOp.getVariables().get(0);
+                            if (unnestVar == var) {
+                                if ((anyVarIsAJoinVar(aOp.getVariables())
+                                        || assignVarPresentInLeafInput(aOp, leafInput))) {
+                                    info.first = false;
+                                    return info;
+                                } else {
+                                    ops.add(aOp);
                                 }
                             }
                         }
                     }
                 }
-                p = p.getInputs().get(0).getValue();
             }
-            ops.add(unnestOp); // the unnestOp will be the last (and may be the only op)
-            bigList.add(ops);
+            p = p.getInputs().get(0).getValue();
         }
+        ops.add(unnestOp); // the unnestOp will be the last (and may be the only op)
+        info.second = ops;
+
+        /*
         unnestOpsInfo.add(bigList); // one for each LeafInput. If empty, means that there are no array references in this leafInout
         // also need to add some dummy entries for the fake leafInputs. Add as many as unnestOps. This will make the code in setCardsAndSizes happy
-
-        for (ILogicalOperator q : unnestOps) {
-            bigList = new ArrayList<>();
-            unnestOpsInfo.add(bigList);
-            realLeafInputs.add(false);
-        }
+         */
+        return info;
     }
 
     private boolean assignVarPresentInLeafInput(AssignOperator aOp, ILogicalOperator leafInput)