Fix issue827

Change-Id: If21e2a4efc69e46ee452353fc06507b3f890db0d
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/189
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Preston Carman <ecarm002@ucr.edu>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 9626774..e6e610d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -94,8 +94,8 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.NestedSubplanToJoinRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushAssignBelowUnionAllRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushMapOperatorDownThroughProductRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushGroupByIntoSortRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushMapOperatorDownThroughProductRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushNestedOrderByUnderPreSortedGroupByRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
index c157063..412903a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -33,8 +33,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -59,26 +61,119 @@
         if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
             return false;
         }
+        AssignOperator assignOp = (AssignOperator) op;
 
         Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-        if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            return false;
+        if (op2.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+            AggregateOperator aggOp = (AggregateOperator) op2;
+            // Make sure the agg expr is a listify.
+            return pushAggregateFunction(aggOp, assignOp, context);
+        } else if (op2.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                || op2.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op2;
+            // Tries to push aggregates through the join.
+            if (containsAggregate(assignOp.getExpressions()) && pushableThroughJoin(join)) {
+                pushAggregateFunctionThroughJoin(join, assignOp, context);
+                return true;
+            }
         }
-        // If there's a group by below the agg, then we want to have the agg pushed into the group by.
-        Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
+        return false;
+    }
+
+    /**
+     * Recursively check whether the list of expressions contains an aggregate function.
+     * 
+     * @param exprRefs
+     * @return true if the list contains an aggregate function and false otherwise.
+     */
+    private boolean containsAggregate(List<Mutable<ILogicalExpression>> exprRefs) {
+        for (Mutable<ILogicalExpression> exprRef : exprRefs) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                continue;
+            }
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = AsterixBuiltinFunctions.getAggregateFunction(funcExpr
+                    .getFunctionIdentifier());
+            if (funcIdent == null) {
+                // Recursively look in func args.
+                if (containsAggregate(funcExpr.getArguments())) {
+                    return true;
+                }
+            } else {
+                // This is an aggregation function.
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Check whether the join is aggregate-pushable, that is,
+     * 1) the join condition is true;
+     * 2) each join branch produces only one tuple.
+     * 
+     * @param join
+     * @return true if pushable
+     */
+    private boolean pushableThroughJoin(AbstractBinaryJoinOperator join) {
+        ILogicalExpression condition = join.getCondition().getValue();
+        if (condition.equals(ConstantExpression.TRUE)) {
+            // Checks if the aggregation functions are pushable through the join
+            boolean pushable = true;
+            for (Mutable<ILogicalOperator> branchRef : join.getInputs()) {
+                AbstractLogicalOperator branch = (AbstractLogicalOperator) branchRef.getValue();
+                if (branch.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    pushable &= true;
+                } else if (branch.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                        || branch.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+                    AbstractBinaryJoinOperator childJoin = (AbstractBinaryJoinOperator) branch;
+                    pushable &= pushableThroughJoin(childJoin);
+                } else {
+                    pushable &= false;
+                }
+            }
+            return pushable;
+        }
+        return false;
+    }
+
+    /**
+     * Does the actual push of aggregates for qualified joins.
+     * 
+     * @param join
+     * @param assignOp
+     *            that contains aggregate function calls.
+     * @param context
+     * @throws AlgebricksException
+     */
+    private void pushAggregateFunctionThroughJoin(AbstractBinaryJoinOperator join, AssignOperator assignOp,
+            IOptimizationContext context) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> branchRef : join.getInputs()) {
+            AbstractLogicalOperator branch = (AbstractLogicalOperator) branchRef.getValue();
+            if (branch.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                AggregateOperator aggOp = (AggregateOperator) branch;
+                pushAggregateFunction(aggOp, assignOp, context);
+            } else if (branch.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                    || branch.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+                AbstractBinaryJoinOperator childJoin = (AbstractBinaryJoinOperator) branch;
+                pushAggregateFunctionThroughJoin(childJoin, assignOp, context);
+            }
+        }
+    }
+
+    private boolean pushAggregateFunction(AggregateOperator aggOp, AssignOperator assignOp, IOptimizationContext context)
+            throws AlgebricksException {
+        Mutable<ILogicalOperator> opRef3 = aggOp.getInputs().get(0);
         AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue();
+        // If there's a group by below the agg, then we want to have the agg pushed into the group by.
         if (op3.getOperatorTag() == LogicalOperatorTag.GROUP) {
             return false;
         }
-
-        AssignOperator assignOp = (AssignOperator) op;
-        AggregateOperator aggOp = (AggregateOperator) op2;
         if (aggOp.getVariables().size() != 1) {
             return false;
         }
-
-        // Make sure the agg expr is a listify.
         ILogicalExpression aggExpr = aggOp.getExpressions().get(0).getValue();
         if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
             return false;
@@ -128,7 +223,6 @@
 
         context.computeAndSetTypeEnvironmentForOperator(aggOp);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
-
         return true;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveRedundantListifyRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveRedundantListifyRule.java
index cdad37c..7e92ca7 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveRedundantListifyRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/RemoveRedundantListifyRule.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -82,6 +83,7 @@
     private boolean applyRuleDown(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varSet,
             IOptimizationContext context) throws AlgebricksException {
         boolean changed = applies(opRef, varSet, context);
+        changed |= appliesForReverseCase(opRef, varSet, context);
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         VariableUtilities.getUsedVariables(op, varSet);
         if (op.hasNestedPlans()) {
@@ -195,4 +197,58 @@
         return true;
     }
 
+    private boolean appliesForReverseCase(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varUsedAbove,
+            IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AggregateOperator agg = (AggregateOperator) op1;
+        if (agg.getVariables().size() > 1 || agg.getVariables().size() <= 0) {
+            return false;
+        }
+        LogicalVariable aggVar = agg.getVariables().get(0);
+        ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) aggFun;
+        if (!AsterixBuiltinFunctions.LISTIFY.equals(f.getFunctionIdentifier())) {
+            return false;
+        }
+        if (f.getArguments().size() != 1) {
+            return false;
+        }
+        ILogicalExpression arg0 = f.getArguments().get(0).getValue();
+        if (((AbstractLogicalExpression) arg0).getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+            return false;
+        }
+        LogicalVariable aggInputVar = ((VariableReferenceExpression) arg0).getVariableReference();
+
+        if (agg.getInputs().size() == 0) {
+            return false;
+        }
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) agg.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return false;
+        }
+        UnnestOperator unnest = (UnnestOperator) op2;
+        if (unnest.getPositionalVariable() != null) {
+            return false;
+        }
+        if (!unnest.getVariable().equals(aggInputVar)) {
+            return false;
+        }
+        List<LogicalVariable> unnestSource = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(unnest, unnestSource);
+        if (unnestSource.size() > 1 || unnestSource.size() <= 0) {
+            return false;
+        }
+        ArrayList<LogicalVariable> assgnVars = new ArrayList<LogicalVariable>(1);
+        assgnVars.add(aggVar);
+        ArrayList<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+        assgnExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestSource.get(0))));
+        AssignOperator assign = new AssignOperator(assgnVars, assgnExprs);
+        assign.getInputs().add(unnest.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(assign);
+        opRef.setValue(assign);
+        return true;
+    }
 }
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-issue827-2.aql b/asterix-app/src/test/resources/optimizerts/queries/query-issue827-2.aql
new file mode 100644
index 0000000..270383d
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-issue827-2.aql
@@ -0,0 +1,52 @@
+/*
+ * Description  : This test case is to verify the fix for issue827
+ * https://code.google.com/p/asterixdb/issues/detail?id=827
+ * Expected Res : SUCCESS
+ * Date         : 3rd Dec. 2014
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int32,
+  l_partkey: int32,
+  l_suppkey: int32,
+  l_linenumber: int32,
+  l_quantity: double,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber;
+
+let $qty := for $i in dataset('LineItem') where $i.l_shipdate <= '1998-09-02' return $i.l_quantity
+let $base_price := for $i in dataset('LineItem') return $i.l_extendedprice
+let $disc_price := for $i in dataset('LineItem') return $i.l_extendedprice * (1 - $i.l_discount)
+let $charge := for $i in dataset('LineItem') return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)
+let $price := for $i in dataset('LineItem') return $i.l_extendedprice
+let $disc := for $i in dataset('LineItem') return $i.l_discount
+let $order := for $l in dataset('LineItem') return $l
+return {
+  "sum_qty_partial": sum($qty),
+  "sum_base_price": sum($base_price),
+  "sum_disc_price": sum($disc_price),
+  "sum_charge": sum($charge),
+  "ave_qty": avg($qty),
+  "ave_price": avg($price),
+  "ave_disc": avg($disc),
+  "count_order": count($order)
+}
+
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-issue827-2.plan b/asterix-app/src/test/resources/optimizerts/results/query-issue827-2.plan
new file mode 100644
index 0000000..0d68333
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-issue827-2.plan
@@ -0,0 +1,135 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- NESTED_LOOP  |PARTITIONED|
+            -- BROADCAST_EXCHANGE  |PARTITIONED|
+              -- NESTED_LOOP  |PARTITIONED|
+                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                  -- NESTED_LOOP  |PARTITIONED|
+                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                      -- NESTED_LOOP  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- NESTED_LOOP  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                              -- NESTED_LOOP  |PARTITIONED|
+                                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                  -- AGGREGATE  |UNPARTITIONED|
+                                    -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                      -- AGGREGATE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- SPLIT  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                  -- AGGREGATE  |UNPARTITIONED|
+                                    -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                      -- AGGREGATE  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- SPLIT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- SPLIT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                              -- AGGREGATE  |UNPARTITIONED|
+                                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                  -- AGGREGATE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SPLIT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                          -- AGGREGATE  |UNPARTITIONED|
+                            -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                              -- AGGREGATE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- SPLIT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                      -- AGGREGATE  |UNPARTITIONED|
+                        -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                          -- AGGREGATE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- SPLIT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- SPLIT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                  -- AGGREGATE  |UNPARTITIONED|
+                    -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                      -- AGGREGATE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- SPLIT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+              -- AGGREGATE  |UNPARTITIONED|
+                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                  -- AGGREGATE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- SPLIT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.1.ddl.aql
new file mode 100644
index 0000000..ea93496
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+ * Description  : This test case is to verify the fix for issue827
+ * https://code.google.com/p/asterixdb/issues/detail?id=827
+ * Expected Res : SUCCESS
+ * Date         : 16th Nov. 2014
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int32,
+  l_partkey: int32,
+  l_suppkey: int32,
+  l_linenumber: int32,
+  l_quantity: double,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber;
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.2.update.aql
new file mode 100644
index 0000000..ff9e419
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.2.update.aql
@@ -0,0 +1,13 @@
+/*
+ * Description  : This test case is to verify the fix for issue827
+ * https://code.google.com/p/asterixdb/issues/detail?id=827
+ * Expected Res : SUCCESS
+ * Date         : 16th Nov. 2014
+ */
+
+use dataverse tpch;
+
+load dataset LineItem 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|")) pre-sorted;
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.3.query.aql
new file mode 100644
index 0000000..90fc4da
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827-2/query-issue827-2.3.query.aql
@@ -0,0 +1,27 @@
+/*
+ * Description  : This test case is to verify the fix for issue827
+ * https://code.google.com/p/asterixdb/issues/detail?id=827
+ * Expected Res : SUCCESS
+ * Date         : 3rd Dec. 2014
+ */
+
+use dataverse tpch;
+
+let $qty := for $i in dataset('LineItem') where $i.l_shipdate <= '1998-09-02' return $i.l_quantity
+let $base_price := for $i in dataset('LineItem') return $i.l_extendedprice
+let $disc_price := for $i in dataset('LineItem') return $i.l_extendedprice * (1 - $i.l_discount)
+let $charge := for $i in dataset('LineItem') return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)
+let $price := for $i in dataset('LineItem') return $i.l_extendedprice
+let $disc := for $i in dataset('LineItem') return $i.l_discount
+let $order := for $l in dataset('LineItem') return $l
+return {
+  "sum_qty_partial": sum($qty),
+  "sum_base_price": sum($base_price),
+  "sum_disc_price": sum($disc_price),
+  "sum_charge": sum($charge),
+  "ave_qty": avg($qty),
+  "ave_price": avg($price),
+  "ave_disc": avg($disc),
+  "count_order": count($order)
+}
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.1.ddl.aql
index 7ae9798..a9ca205 100644
--- a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.1.ddl.aql
@@ -11,21 +11,21 @@
 use dataverse tpch;
 
 create type LineItemType as closed {
-  l_orderkey: int32, 
-  l_partkey: int32, 
-  l_suppkey: int32, 
-  l_linenumber: int32, 
+  l_orderkey: int32,
+  l_partkey: int32,
+  l_suppkey: int32,
+  l_linenumber: int32,
   l_quantity: double, 
   l_extendedprice: double,
   l_discount: double, 
   l_tax: double,
-  l_returnflag: string, 
-  l_linestatus: string, 
+  l_returnflag: string,
+  l_linestatus: string,
   l_shipdate: string,
-  l_commitdate: string, 
-  l_receiptdate: string, 
-  l_shipinstruct: string, 
-  l_shipmode: string, 
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
   l_comment: string
 }
 
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.3.query.aql
index 2a025b3..5d7b18e 100644
--- a/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/query-issue827/query-issue827.3.query.aql
@@ -8,8 +8,9 @@
 use dataverse tpch;
 
 let $quantities := for $l in dataset('LineItem') return $l.l_quantity
-//let $extendedprices := for $l in dataset('LineItem') return $l.l_extendedprice
+let $extendedprices := for $l in dataset('LineItem') return $l.l_extendedprice
 return {
-  "count_cheaps": count($quantities)
-  //"count_expensives": sum(for $e in $extendedprices return $e)
+  "count_cheaps": count($quantities),
+  "count_expensives": sum(for $e in $extendedprices return $e)
 }
+
diff --git a/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827-2/query-issue827-2.1.adm b/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827-2/query-issue827-2.1.adm
new file mode 100644
index 0000000..c6228e2
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827-2/query-issue827-2.1.adm
@@ -0,0 +1,2 @@
+[ { "sum_qty_partial": 150194.0d, "sum_base_price": 1.5277439838000003E8d, "sum_disc_price": 1.4517182996390015E8d, "sum_charge": 1.5100895558728895E8d, "ave_qty": 25.39634764964491d, "ave_price": 25441.198731057455d, "ave_disc": 0.050031640299750366d, "count_order": 6005i64 }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue810.1.adm b/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue810.1.adm
deleted file mode 100644
index e0be478..0000000
--- a/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue810.1.adm
+++ /dev/null
@@ -1,5 +0,0 @@
-[ { "l_returnflag": "A", "l_linestatus": "F", "count_cheaps": 680i64, "count_expensives": 798i64 }
-, { "l_returnflag": "N", "l_linestatus": "F", "count_cheaps": 12i64, "count_expensives": 26i64 }
-, { "l_returnflag": "N", "l_linestatus": "O", "count_cheaps": 1345i64, "count_expensives": 1596i64 }
-, { "l_returnflag": "R", "l_linestatus": "F", "count_cheaps": 679i64, "count_expensives": 778i64 }
- ]
diff --git a/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue827.1.adm b/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue827.1.adm
new file mode 100644
index 0000000..ef9d46c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/tpch/query-issue827/query-issue827.1.adm
@@ -0,0 +1,2 @@
+[ { "count_cheaps": 6005i64, "count_expensives": 1.5277439838E8d }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index c9da615..6fdef2a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4556,6 +4556,16 @@
                 <output-dir compare="Text">query-issue810-3</output-dir>
             </compilation-unit>
         </test-case>
+        <test-case FilePath="tpch">
+            <compilation-unit name="query-issue827">
+                <output-dir compare="Text">query-issue827</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="tpch">
+            <compilation-unit name="query-issue827-2">
+                <output-dir compare="Text">query-issue827-2</output-dir>
+            </compilation-unit>
+        </test-case>
     </test-group>
     <test-group name="tpch-sql-like">
         <test-case FilePath="tpch-sql-like">