Fix query plans for constant aggregates.

- Fixes for both global aggregates and group-by aggregates.
- Allow optimizer tests to test sql++ queries.

Change-Id: I8c2b9f4d566e62d56efe155554a317ea333420a6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/876
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 814c570..03ea289 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -154,7 +154,6 @@
         normalization.add(new ExtractGbyExpressionsRule());
         normalization.add(new ExtractDistinctByExpressionsRule());
         normalization.add(new ExtractOrderExpressionsRule());
-        normalization.add(new AsterixMoveFreeVariableOperatorOutOfSubplanRule());
 
         // IntroduceStaticTypeCastRule should go before
         // IntroduceDynamicTypeCastRule to
@@ -205,6 +204,9 @@
         condPushDownAndJoinInference.add(new PushSubplanIntoGroupByRule());
         condPushDownAndJoinInference.add(new NestedSubplanToJoinRule());
         condPushDownAndJoinInference.add(new EliminateSubplanWithInputCardinalityOneRule());
+        // The following rule should be fired after PushAggregateIntoGroupbyRule because
+        // pulling invariants out of a subplan will make PushAggregateIntoGroupby harder.
+        condPushDownAndJoinInference.add(new AsterixMoveFreeVariableOperatorOutOfSubplanRule());
 
         return condPushDownAndJoinInference;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
index e5ffa2e..29d8532 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -155,11 +156,28 @@
         if (varUsedAbove.contains(unnestedVar)) {
             return false;
         }
+        Mutable<ILogicalOperator> aggregateParentRef = opRef;
+        AbstractLogicalOperator r = op1;
+        boolean metAggregate = false;
+        while (r.getInputs().size() == 1) {
+            aggregateParentRef = r.getInputs().get(0);
+            r = (AbstractLogicalOperator) aggregateParentRef.getValue();
+            if (r.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                AssignOperator assign = (AssignOperator) r;
+                List<LogicalVariable> variables = assign.getVariables();
+                // The assign operator doesn't produce any variable that is used by the unnest.
+                if (variables.contains(unnestedVar)) {
+                    return false;
+                }
+            } else {
+                if (r.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    metAggregate = true;
+                }
+                break;
+            }
+        }
 
-        Mutable<ILogicalOperator> opRef2 = op1.getInputs().get(0);
-        AbstractLogicalOperator r = (AbstractLogicalOperator) opRef2.getValue();
-
-        if (r.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+        if (!metAggregate) {
             return false;
         }
         AggregateOperator agg = (AggregateOperator) r;
@@ -185,9 +203,9 @@
         }
         LogicalVariable paramVar = ((VariableReferenceExpression) arg0).getVariableReference();
 
-        ArrayList<LogicalVariable> assgnVars = new ArrayList<LogicalVariable>(1);
+        List<LogicalVariable> assgnVars = new ArrayList<>(1);
         assgnVars.add(unnest1.getVariable());
-        ArrayList<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+        List<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<>(1);
         assgnExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(paramVar)));
         AssignOperator assign = new AssignOperator(assgnVars, assgnExprs);
         assign.getInputs().add(agg.getInputs().get(0));
@@ -195,20 +213,22 @@
         LogicalVariable posVar = unnest1.getPositionalVariable();
 
         if (posVar == null) {
-            opRef.setValue(assign);
+            // Removes the aggregate operator.
+            aggregateParentRef.setValue(assign);
         } else {
-            ArrayList<LogicalVariable> raggVars = new ArrayList<LogicalVariable>(1);
+            List<LogicalVariable> raggVars = new ArrayList<>(1);
             raggVars.add(posVar);
-            ArrayList<Mutable<ILogicalExpression>> rAggExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+            List<Mutable<ILogicalExpression>> rAggExprs = new ArrayList<>(1);
             StatefulFunctionCallExpression tidFun = new StatefulFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
             rAggExprs.add(new MutableObject<ILogicalExpression>(tidFun));
             RunningAggregateOperator rAgg = new RunningAggregateOperator(raggVars, rAggExprs);
             rAgg.getInputs().add(new MutableObject<ILogicalOperator>(assign));
-            opRef.setValue(rAgg);
+            aggregateParentRef.setValue(rAgg);
             context.computeAndSetTypeEnvironmentForOperator(rAgg);
         }
-
+        // Removes the unnest operator.
+        opRef.setValue(unnest1.getInputs().get(0).getValue());
         return true;
     }
 
@@ -243,6 +263,7 @@
         if (agg.getInputs().size() == 0) {
             return false;
         }
+
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) agg.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             return false;
@@ -266,7 +287,7 @@
             return false;
         }
 
-        ArrayList<LogicalVariable> assgnVars = new ArrayList<LogicalVariable>(1);
+        List<LogicalVariable> assgnVars = new ArrayList<>(1);
         assgnVars.add(aggVar);
         AssignOperator assign = new AssignOperator(assgnVars, scanFunc.getArguments());
         assign.getInputs().add(unnest.getInputs().get(0));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 193e0aa..d0be428 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -34,11 +34,13 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.runtime.HDFSCluster;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -54,7 +56,8 @@
     private static final Logger LOGGER = Logger.getLogger(OptimizerTest.class.getName());
 
     private static final String SEPARATOR = File.separator;
-    private static final String EXTENSION_QUERY = "aql";
+    private static final String EXTENSION_AQL = "aql";
+    private static final String EXTENSION_SQLPP = "sqlpp";
     private static final String EXTENSION_RESULT = "plan";
     private static final String FILENAME_IGNORE = "ignore.txt";
     private static final String FILENAME_ONLY = "only.txt";
@@ -67,7 +70,8 @@
     private static final ArrayList<String> ignore = AsterixTestHelper.readFile(FILENAME_IGNORE, PATH_BASE);
     private static final ArrayList<String> only = AsterixTestHelper.readFile(FILENAME_ONLY, PATH_BASE);
     private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+    private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider();
+    private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider();
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -104,7 +108,7 @@
                 suiteBuildPerFile(innerfile, testArgs, subdir);
             }
         }
-        if (file.isFile() && file.getName().endsWith(EXTENSION_QUERY)) {
+        if (file.isFile() && (file.getName().endsWith(EXTENSION_AQL) || file.getName().endsWith(EXTENSION_SQLPP))) {
             String resultFileName = AsterixTestHelper.extToResExt(file.getName(), EXTENSION_RESULT);
             File expectedFile = new File(PATH_EXPECTED + path + resultFileName);
             File actualFile = new File(PATH_ACTUAL + SEPARATOR + path + resultFileName);
@@ -162,8 +166,10 @@
             actualFile.getParentFile().mkdirs();
 
             PrintWriter plan = new PrintWriter(actualFile);
-            AsterixJavaClient asterix = new AsterixJavaClient(
-                    AsterixHyracksIntegrationUtil.getHyracksClientConnection(), query, plan, compilationProvider);
+            ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
+                    : sqlppCompilationProvider;
+            IHyracksClientConnection hcc = AsterixHyracksIntegrationUtil.getHyracksClientConnection();
+            AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider);
             try {
                 asterix.compile(true, false, false, true, true, false, false);
             } catch (AsterixException e) {
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-agg.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-agg.sqlpp
new file mode 100644
index 0000000..a3f8ea1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-agg.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  database TinySocial if exists;
+create  database TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.TwitterUserType as
+{
+  "screen-name" : string
+}
+
+create type TinySocial.TweetMessageType as {
+  tweetid : string
+}
+
+create type TinySocial.FacebookUserType as
+ open {
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+ open {
+  "message-id" : int64
+}
+
+create  table FacebookUsers(FacebookUserType) primary key id;
+create  table FacebookMessages(FacebookMessageType) primary key "message-id";
+create  table TwitterUsers(TwitterUserType) primary key "screen-name";
+create  table TweetMessages(TweetMessageType) primary key tweetid hints ("CARDINALITY"="100");
+
+
+SELECT COUNT(1) count
+FROM FacebookUsers u;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-gby-agg.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-gby-agg.sqlpp
new file mode 100644
index 0000000..dcd770e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate/constant-gby-agg.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  database TinySocial if exists;
+create  database TinySocial;
+
+use TinySocial;
+
+
+create type TinySocial.TwitterUserType as
+{
+  "screen-name" : string
+}
+
+create type TinySocial.TweetMessageType as {
+  tweetid : string
+}
+
+create type TinySocial.FacebookUserType as
+ open {
+  id : int64
+}
+
+create type TinySocial.FacebookMessageType as
+ open {
+  "message-id" : int64
+}
+
+create  table FacebookUsers(FacebookUserType) primary key id;
+create  table FacebookMessages(FacebookMessageType) primary key "message-id";
+create  table TwitterUsers(TwitterUserType) primary key "screen-name";
+create  table TweetMessages(TweetMessageType) primary key tweetid hints ("CARDINALITY"="100");
+
+
+SELECT alias alias, COUNT(1) count
+FROM FacebookUsers u
+GROUP BY u.alias AS alias;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-agg.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-agg.plan
new file mode 100644
index 0000000..790a5d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-agg.plan
@@ -0,0 +1,13 @@
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      -- ASSIGN  |UNPARTITIONED|
+        -- AGGREGATE  |UNPARTITIONED|
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            -- AGGREGATE  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-gby-agg.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-gby-agg.plan
new file mode 100644
index 0000000..dec5a77
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate/constant-gby-agg.plan
@@ -0,0 +1,24 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$29]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$29(ASC)] HASH:[$$29]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$25]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|