[NO ISSUE][COMP] Remove listify() when iterating over aggregate

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Optimizer must remove listify() when iterating
  over an aggregate function that returns a list

Change-Id: I7a8919595be7e4a44a5816ac7f281842d03ecc1f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3556
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
index aacffbf..c0d6f82 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -82,17 +82,17 @@
         LogicalVariable v = context.newVarFromExpression(fc.getVarExpr());
         Expression inExpr = fc.getInExpr();
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(inExpr, tupSource);
+        Pair<ILogicalExpression, Mutable<ILogicalOperator>> pUnnestExpr = makeUnnestExpression(eo.first, eo.second);
         ILogicalOperator returnedOp;
-
         if (fc.getPosVarExpr() == null) {
-            returnedOp = new UnnestOperator(v, new MutableObject<ILogicalExpression>(makeUnnestExpression(eo.first)));
+            returnedOp = new UnnestOperator(v, new MutableObject<>(pUnnestExpr.first));
         } else {
             LogicalVariable pVar = context.newVarFromExpression(fc.getPosVarExpr());
             // We set the positional variable type as INT64 type.
-            returnedOp = new UnnestOperator(v, new MutableObject<ILogicalExpression>(makeUnnestExpression(eo.first)),
-                    pVar, BuiltinType.AINT64, new PositionWriter());
+            returnedOp = new UnnestOperator(v, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
+                    new PositionWriter());
         }
-        returnedOp.getInputs().add(eo.second);
+        returnedOp.getInputs().add(pUnnestExpr.second);
         return new Pair<>(returnedOp, v);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index c6c8a2f..adc6853 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -1274,9 +1274,11 @@
         for (QuantifiedPair qt : qe.getQuantifiedList()) {
             Expression expr = qt.getExpr();
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo1 = langExprToAlgExpression(expr, topOp);
-            topOp = eo1.second;
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> pUnnestExpr =
+                    makeUnnestExpression(eo1.first, eo1.second);
+            topOp = pUnnestExpr.second;
             LogicalVariable uVar = context.newVarFromExpression(qt.getVarExpr());
-            UnnestOperator u = new UnnestOperator(uVar, new MutableObject<>(makeUnnestExpression(eo1.first)));
+            UnnestOperator u = new UnnestOperator(uVar, new MutableObject<>(pUnnestExpr.first));
             u.setSourceLocation(expr.getSourceLocation());
 
             if (firstOp == null) {
@@ -1692,32 +1694,52 @@
         return array;
     }
 
-    protected ILogicalExpression makeUnnestExpression(ILogicalExpression expr) {
+    protected Pair<ILogicalExpression, Mutable<ILogicalOperator>> makeUnnestExpression(ILogicalExpression expr,
+            Mutable<ILogicalOperator> topOpRef) throws CompilationException {
         SourceLocation sourceLoc = expr.getSourceLocation();
-        List<Mutable<ILogicalExpression>> argRefs = new ArrayList<>();
-        argRefs.add(new MutableObject<>(expr));
         switch (expr.getExpressionTag()) {
             case CONSTANT:
             case VARIABLE:
                 UnnestingFunctionCallExpression scanCollExpr1 = new UnnestingFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION), argRefs);
+                        FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION),
+                        mkSingletonArrayList(new MutableObject<>(expr)));
                 scanCollExpr1.setSourceLocation(sourceLoc);
-                return scanCollExpr1;
+                return new Pair<>(scanCollExpr1, topOpRef);
             case FUNCTION_CALL:
                 AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
                 if (fce.getKind() == FunctionKind.UNNEST) {
-                    return expr;
-                } else {
+                    return new Pair<>(expr, topOpRef);
+                } else if (fce.getKind() == FunctionKind.SCALAR && unnestNeedsAssign(fce)) {
+                    LogicalVariable var = context.newVar();
+                    AssignOperator assignOp = new AssignOperator(var, new MutableObject<>(expr));
+                    assignOp.setSourceLocation(sourceLoc);
+                    assignOp.getInputs().add(topOpRef);
+                    VariableReferenceExpression varRef = new VariableReferenceExpression(var);
+                    varRef.setSourceLocation(sourceLoc);
                     UnnestingFunctionCallExpression scanCollExpr2 = new UnnestingFunctionCallExpression(
-                            FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION), argRefs);
+                            FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION),
+                            mkSingletonArrayList(new MutableObject<>(varRef)));
                     scanCollExpr2.setSourceLocation(sourceLoc);
-                    return scanCollExpr2;
+                    return new Pair<>(scanCollExpr2, new MutableObject<>(assignOp));
+                } else {
+                    UnnestingFunctionCallExpression scanCollExpr3 = new UnnestingFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION),
+                            mkSingletonArrayList(new MutableObject<>(expr)));
+                    scanCollExpr3.setSourceLocation(sourceLoc);
+                    return new Pair<>(scanCollExpr3, topOpRef);
                 }
             default:
-                return expr;
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc);
         }
     }
 
+    /**
+     * Whether an Assign operator needs to be introduced when unnesting this function call expression.
+     */
+    private boolean unnestNeedsAssign(AbstractFunctionCallExpression fce) {
+        return BuiltinFunctions.getAggregateFunction(fce.getFunctionIdentifier()) != null;
+    }
+
     private boolean rebindBottomOpRef(ILogicalOperator currentOp, Mutable<ILogicalOperator> opRef,
             Mutable<ILogicalOperator> replacementOpRef) {
         int index = 0;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 7d7e7fe..6ec74cf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -311,16 +311,17 @@
         LogicalVariable fromVar = context.newVarFromExpression(fromTerm.getLeftVariable());
         Expression fromExpr = fromTerm.getLeftExpression();
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(fromExpr, tupSource);
+        Pair<ILogicalExpression, Mutable<ILogicalOperator>> pUnnestExpr = makeUnnestExpression(eo.first, eo.second);
         UnnestOperator unnestOp;
         if (fromTerm.hasPositionalVariable()) {
             LogicalVariable pVar = context.newVarFromExpression(fromTerm.getPositionalVariable());
             // We set the positional variable type as BIGINT type.
-            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(makeUnnestExpression(eo.first)), pVar,
-                    BuiltinType.AINT64, new PositionWriter());
+            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
+                    new PositionWriter());
         } else {
-            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(makeUnnestExpression(eo.first)));
+            unnestOp = new UnnestOperator(fromVar, new MutableObject<>(pUnnestExpr.first));
         }
-        unnestOp.getInputs().add(eo.second);
+        unnestOp.getInputs().add(pUnnestExpr.second);
         unnestOp.setSourceLocation(sourceLoc);
 
         // Processes joins, unnests, and nests.
@@ -445,9 +446,11 @@
             LogicalVariable outerUnnestVar = context.newVar();
             VariableReferenceExpression aggVarRefExpr = new VariableReferenceExpression(aggVar);
             aggVarRefExpr.setSourceLocation(aggOp.getSourceLocation());
-            LeftOuterUnnestOperator outerUnnestOp = new LeftOuterUnnestOperator(outerUnnestVar,
-                    new MutableObject<>(makeUnnestExpression(aggVarRefExpr)));
-            outerUnnestOp.getInputs().add(new MutableObject<>(subplanOp));
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> pUnnestExpr =
+                    makeUnnestExpression(aggVarRefExpr, new MutableObject<>(subplanOp));
+            LeftOuterUnnestOperator outerUnnestOp =
+                    new LeftOuterUnnestOperator(outerUnnestVar, new MutableObject<>(pUnnestExpr.first));
+            outerUnnestOp.getInputs().add(pUnnestExpr.second);
             outerUnnestOp.setSourceLocation(aggOp.getSourceLocation());
             currentTopOp = outerUnnestOp;
 
@@ -528,20 +531,21 @@
         LogicalVariable rightVar = context.newVarFromExpression(binaryCorrelate.getRightVariable());
         Expression rightExpr = binaryCorrelate.getRightExpression();
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(rightExpr, inputOpRef);
+        Pair<ILogicalExpression, Mutable<ILogicalOperator>> pUnnestExpr = makeUnnestExpression(eo.first, eo.second);
         AbstractUnnestOperator unnestOp;
         if (binaryCorrelate.hasPositionalVariable()) {
             LogicalVariable pVar = context.newVarFromExpression(binaryCorrelate.getPositionalVariable());
             // We set the positional variable type as BIGINT type.
             unnestOp = innerUnnest
-                    ? new UnnestOperator(rightVar, new MutableObject<>(makeUnnestExpression(eo.first)), pVar,
-                            BuiltinType.AINT64, new PositionWriter())
-                    : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(makeUnnestExpression(eo.first)), pVar,
+                    ? new UnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first), pVar, BuiltinType.AINT64,
+                            new PositionWriter())
+                    : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first), pVar,
                             BuiltinType.AINT64, new PositionWriter());
         } else {
-            unnestOp = innerUnnest ? new UnnestOperator(rightVar, new MutableObject<>(makeUnnestExpression(eo.first)))
-                    : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(makeUnnestExpression(eo.first)));
+            unnestOp = innerUnnest ? new UnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first))
+                    : new LeftOuterUnnestOperator(rightVar, new MutableObject<>(pUnnestExpr.first));
         }
-        unnestOp.getInputs().add(eo.second);
+        unnestOp.getInputs().add(pUnnestExpr.second);
         unnestOp.setSourceLocation(binaryCorrelate.getRightVariable().getSourceLocation());
         return new Pair<>(unnestOp, rightVar);
     }
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-sql-sugar/array_agg/array_agg.1.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-sql-sugar/array_agg/array_agg.1.sqlpp
new file mode 100644
index 0000000..7d626e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-sql-sugar/array_agg/array_agg.1.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Iteration over SQL++ core aggregate for array_agg().
+ *              : Make sure there is no listify() in the query plan
+ * Expected Res : Success
+ */
+
+select value t.x
+from strict_arrayagg((from range(1,2) r select r x)) t
+order by t.x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/array_agg/array_agg.1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/array_agg/array_agg.1.plan
new file mode 100644
index 0000000..bb0f33b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/array_agg/array_agg.1.plan
@@ -0,0 +1,6 @@
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    -- STABLE_SORT [$$r(ASC)]  |UNPARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+        -- UNNEST  |UNPARTITIONED|
+          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql-sugar/array_agg/array_agg.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql-sugar/array_agg/array_agg.7.query.sqlpp
new file mode 100644
index 0000000..afe6c41
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-sql-sugar/array_agg/array_agg.7.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Iteration over SQL++ core aggregate for array_agg()
+ * Expected Res : Success
+ */
+
+select value t.x
+from strict_arrayagg((from range(1,2) r select r x)) t
+order by t.x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql-sugar/array_agg/array_agg.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql-sugar/array_agg/array_agg.7.adm
new file mode 100644
index 0000000..7a754f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-sql-sugar/array_agg/array_agg.7.adm
@@ -0,0 +1,2 @@
+1
+2
\ No newline at end of file