checkpoint: added supporting rules and runtimes for running aggregation.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 8ef4847..5dd5d27 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -229,8 +230,6 @@
 
             AssignOperator gbyKeyAssign = new AssignOperator(gbyKeyAssgnVars, gbyKeyAssgnExprs);
             gbyKeyAssign.getInputs().add(gby.getInputs().get(0));
-            
-            context.computeAndSetTypeEnvironmentForOperator(gbyKeyAssign);
 
             // add sort to replace group-by
             List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
@@ -241,16 +240,45 @@
             OrderOperator order = new OrderOperator(orderExprs);
             order.getInputs().add(new MutableObject<ILogicalOperator>(gbyKeyAssign));
 
-            context.computeAndSetTypeEnvironmentForOperator(order);
-
-            assign.getInputs().add(new MutableObject<ILogicalOperator>(order));
             opRef.setValue(assign);
-        } else {
-            
-            return false;
-        }
+            assign.getInputs().add(new MutableObject<ILogicalOperator>(order));
 
-        context.computeAndSetTypeEnvironmentForOperator(assign);
+            context.computeAndSetTypeEnvironmentForOperator(gbyKeyAssign);
+            context.computeAndSetTypeEnvironmentForOperator(order);
+            context.computeAndSetTypeEnvironmentForOperator(assign);
+
+        } else {
+            // if positional variable is used in unnest, the unnest will be pushed into the group-by as a running-aggregate
+
+            // First create assign for the unnest variable
+            List<LogicalVariable> nestedAssignVars = new ArrayList<LogicalVariable>();
+            List<Mutable<ILogicalExpression>> nestedAssignExprs = new ArrayList<Mutable<ILogicalExpression>>();
+            nestedAssignVars.add(unnest1.getVariable());
+            nestedAssignExprs.add(new MutableObject<ILogicalExpression>(arg0));
+            AssignOperator nestedAssign = new AssignOperator(nestedAssignVars, nestedAssignExprs);
+            nestedAssign.getInputs().add(opRef2);
+
+            // Then create running aggregation for the positional variable
+            List<LogicalVariable> raggVars = new ArrayList<LogicalVariable>();
+            List<Mutable<ILogicalExpression>> raggExprs = new ArrayList<Mutable<ILogicalExpression>>();
+            raggVars.add(posVar);
+            StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+            raggExprs.add(new MutableObject<ILogicalExpression>(fce));
+            RunningAggregateOperator raggOp = new RunningAggregateOperator(raggVars, raggExprs);
+            raggOp.setExecutionMode(unnest1.getExecutionMode());
+            RunningAggregatePOperator raggPOp = new RunningAggregatePOperator();
+            raggOp.setPhysicalOperator(raggPOp);
+            raggOp.getInputs().add(nestedPlanRoot.getInputs().get(0));
+            gby.getNestedPlans().get(0).getRoots().set(0, new MutableObject<ILogicalOperator>(raggOp));
+            
+            opRef.setValue(nestedAssign);
+
+            context.computeAndSetTypeEnvironmentForOperator(nestedAssign);
+            context.computeAndSetTypeEnvironmentForOperator(raggOp);
+            context.computeAndSetTypeEnvironmentForOperator(gby);
+
+        }
 
         return true;
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
index efca336..004e221 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PullPositionalVariableFromUnnestRule.java
@@ -20,24 +20,17 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
-import edu.uci.ics.asterix.runtime.evaluators.functions.FieldAccessByIndexDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -50,8 +43,7 @@
     }
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             return false;
@@ -64,45 +56,18 @@
         ArrayList<LogicalVariable> rOpVars = new ArrayList<LogicalVariable>();
         rOpVars.add(p);
         ArrayList<Mutable<ILogicalExpression>> rOpExprList = new ArrayList<Mutable<ILogicalExpression>>();
-
-        ArrayList<Mutable<ILogicalExpression>> fceArgsList = new ArrayList<>();
-
-        // add the argument for the input
-        fceArgsList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnest.getVariable())));
-
-        // add the reference index, which should be the last column in the tuple produced by an unnest operator
-        fceArgsList.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
-                new AInt32(unnest.getSchema().size())))));
-
         StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
-                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX),
-                UnpartitionedPropertyComputer.INSTANCE, fceArgsList);
-
-        //StatefulFunctionCallExpression fce = new StatefulFunctionCallExpression(
-        //        FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
         rOpExprList.add(new MutableObject<ILogicalExpression>(fce));
-
-        AssignOperator aOp = new AssignOperator(rOpVars, rOpExprList);
-
-        //RunningAggregateOperator rOp = new RunningAggregateOperator(rOpVars, rOpExprList);
-        aOp.setExecutionMode(unnest.getExecutionMode());
-
-        AssignPOperator aPop = new AssignPOperator();
-        aOp.setPhysicalOperator(aPop);
-        aOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
-
-        opRef.setValue(aOp);
+        RunningAggregateOperator rOp = new RunningAggregateOperator(rOpVars, rOpExprList);
+        rOp.setExecutionMode(unnest.getExecutionMode());
+        RunningAggregatePOperator rPop = new RunningAggregatePOperator();
+        rOp.setPhysicalOperator(rPop);
+        rOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
+        opRef.setValue(rOp);
         unnest.setPositionalVariable(null);
-        context.computeAndSetTypeEnvironmentForOperator(aOp);
+        context.computeAndSetTypeEnvironmentForOperator(rOp);
         context.computeAndSetTypeEnvironmentForOperator(unnest);
-
-        //        RunningAggregatePOperator rPop = new RunningAggregatePOperator();
-        //        rOp.setPhysicalOperator(rPop);
-        //        rOp.getInputs().add(new MutableObject<ILogicalOperator>(unnest));
-        //        opRef.setValue(rOp);
-        //        unnest.setPositionalVariable(null);
-        //        context.computeAndSetTypeEnvironmentForOperator(rOp);
-        //        context.computeAndSetTypeEnvironmentForOperator(unnest);
         return true;
     }
-}
+}
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 500f581..7199f7a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -95,49 +95,67 @@
                 ILogicalPlan p0 = gby.getNestedPlans().get(0);
                 if (p0.getRoots().size() == 1) {
                     Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
-                    AggregateOperator aggOp = (AggregateOperator) r0.getValue();
-                    boolean serializable = true;
-                    for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
-                        AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
-                        if (!AsterixBuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
-                            serializable = false;
-                            break;
-                        }
-                    }
-
-                    if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
-                            .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
-                        if (serializable) {
-                            // if not serializable, use external group-by
-                            int i = 0;
-                            for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
-                                AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef
-                                        .getValue();
-                                AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
-                                        .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
-                                                expr.getArguments());
-                                aggOp.getExpressions().get(i).setValue(serialAggExpr);
-                                i++;
+                    if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+                            LogicalOperatorTag.AGGREGATE)) {
+                        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+                        boolean serializable = true;
+                        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+                            AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
+                            if (!AsterixBuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
+                                serializable = false;
+                                break;
                             }
+                        }
 
-                            ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(gby.getGroupByList(),
-                                    physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                    physicalOptimizationConfig.getExternalGroupByTableSize());
-                            op.setPhysicalOperator(externalGby);
-                            generateMergeAggregationExpressions(gby, context);
-                        } else {
-                            // if not serializable, use pre-clustered group-by
-                            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
-                            List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
-                            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-                                ILogicalExpression expr = p.second.getValue();
-                                if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                    VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-                                    columnList.add(varRef.getVariableReference());
+                        if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+                                .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
+                            if (serializable) {
+                                // if not serializable, use external group-by
+                                int i = 0;
+                                for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+                                    AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef
+                                            .getValue();
+                                    AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+                                            .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
+                                                    expr.getArguments());
+                                    aggOp.getExpressions().get(i).setValue(serialAggExpr);
+                                    i++;
                                 }
+
+                                ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
+                                        gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
+                                        physicalOptimizationConfig.getExternalGroupByTableSize());
+                                op.setPhysicalOperator(externalGby);
+                                generateMergeAggregationExpressions(gby, context);
+                            } else {
+                                // if not serializable, use pre-clustered group-by
+                                List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+                                List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+                                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+                                    ILogicalExpression expr = p.second.getValue();
+                                    if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                                        VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                                        columnList.add(varRef.getVariableReference());
+                                    }
+                                }
+                                op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
                             }
-                            op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
                         }
+                    } else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+                            LogicalOperatorTag.RUNNINGAGGREGATE)) {
+                        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+                        List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+                        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+                            ILogicalExpression expr = p.second.getValue();
+                            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                                columnList.add(varRef.getVariableReference());
+                            }
+                        }
+                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
+                    } else {
+                        throw new AlgebricksException("Unsupported nested operator within a group-by: "
+                                + ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name());
                     }
                 }
             }