Merge branch 'master' into salsubaiee/master_fix_asterix_issue_460
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index b1da831..fed1a15 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;

 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;

-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

@@ -50,15 +49,12 @@
 

     @Override

     public void recomputeSchema() {

-        if (schema == null) {

-            schema = new ArrayList<LogicalVariable>();

-        }

-        schema.clear();

-        for (Mutable<ILogicalExpression> eRef : expressions) {

-            ILogicalExpression e = eRef.getValue();

-            if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

-                VariableReferenceExpression v = (VariableReferenceExpression) e;

-                schema.add(v.getVariableReference());

+        schema = new ArrayList<LogicalVariable>();

+        schema.addAll(this.getDistinctByVarList());

+        List<LogicalVariable> inputSchema = inputs.get(0).getValue().getSchema();

+        for (LogicalVariable var : inputSchema) {

+            if (!schema.contains(var)) {

+                schema.add(var);

             }

         }

     }

@@ -69,12 +65,16 @@
             @Override

             public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)

                     throws AlgebricksException {

-                for (Mutable<ILogicalExpression> eRef : expressions) {

-                    ILogicalExpression e = eRef.getValue();

-                    if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

-                        VariableReferenceExpression v = (VariableReferenceExpression) e;

-                        target.addVariable(v.getVariableReference());

-                    }

+                /** make sure distinct key vars laid-out first */

+                for (LogicalVariable keyVar : getDistinctByVarList()) {

+                    target.addVariable(keyVar);

+                }

+                /** add other source vars */

+                for (IOperatorSchema srcSchema : sources) {

+                    for (LogicalVariable srcVar : srcSchema)

+                        if (target.findVariable(srcVar) < 0) {

+                            target.addVariable(srcVar);

+                        }

                 }

             }

         };

@@ -128,16 +128,7 @@
 

     @Override

     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {

-        IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());

-        IVariableTypeEnvironment childEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());

-        for (Mutable<ILogicalExpression> exprRef : expressions) {

-            ILogicalExpression expr = exprRef.getValue();

-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

-                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

-                env.setVarType(varRefExpr.getVariableReference(), childEnv.getType(expr));

-            }

-        }

-        return env;

+        return createPropagatingAllInputsTypeEnvironment(ctx);

     }

 

 }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 0de9652..5225ac7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -14,7 +14,9 @@
  */

 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;

 

+import java.util.ArrayList;

 import java.util.Collection;

+import java.util.List;

 

 import org.apache.commons.lang3.mutable.Mutable;

 

@@ -85,11 +87,17 @@
 

     @Override

     public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {

-        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

-            ILogicalExpression expr = exprRef.getValue();

-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

-                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

-                schemaVariables.add(varRefExpr.getVariableReference());

+        List<LogicalVariable> allLiveVars = new ArrayList<LogicalVariable>();

+        for (Mutable<ILogicalOperator> c : op.getInputs()) {

+            VariableUtilities.getLiveVariables(c.getValue(), allLiveVars);

+        }

+        VariableUtilities.getProducedVariables(op, allLiveVars);

+        /** put distinct vars first */

+        schemaVariables.addAll(op.getDistinctByVarList());

+        /** then other live vars */

+        for (LogicalVariable var : allLiveVars) {

+            if (!schemaVariables.contains(var)) {

+                schemaVariables.add(var);

             }

         }

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index 29af97c..b5656cd 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -105,14 +105,24 @@
                 fdColumns[j++] = inputSchemas[0].findVariable(v);
             }
         }
+        int[] keysAndDecs = new int[keys.length + fdColumns.length];
+        for (int i = 0; i < keys.length; i++) {
+            keysAndDecs[i] = keys[i];
+        }
+        for (int i = 0; i < fdColumns.length; i++) {
+            keysAndDecs[i + keys.length] = fdColumns[i];
+        }
+
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                 columnList, context.getTypeEnvironment(op), context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
         IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
-                aggFactories, keys, fdColumns);
+                aggFactories, keysAndDecs);
 
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
-        PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+        /** make fd columns part of the key but the comparator only compares the distinct key columns */
+        PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 7bc150a..00670fa 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -45,6 +45,8 @@
 public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule {
     /** a set of order-breaking operators */
     private final Set<LogicalOperatorTag> orderBreakingOps = new HashSet<LogicalOperatorTag>();
+    /** a set of order-sensitive operators */
+    private final Set<LogicalOperatorTag> orderSensitiveOps = new HashSet<LogicalOperatorTag>();
 
     public EnforceOrderByAfterSubplan() {
         /** add operators that break the ordering */
@@ -52,6 +54,9 @@
         orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
         orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
         orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
+
+        /** add operators that are sensitive to the ordering */
+        orderSensitiveOps.add(LogicalOperatorTag.LIMIT);
     }
 
     @Override
@@ -90,19 +95,25 @@
              * duplicate them on-top-of the subplan operator
              */
             boolean foundTarget = true;
-            AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            boolean orderSensitive = false;
+            Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
             while (child.getOperatorTag() != LogicalOperatorTag.ORDER) {
                 context.addToDontApplySet(this, child);
                 if (orderBreakingOps.contains(child.getOperatorTag())) {
                     foundTarget = false;
                     break;
                 }
+                if (orderSensitiveOps.contains(child.getOperatorTag())) {
+                    orderSensitive = true;
+                }
                 List<Mutable<ILogicalOperator>> childInputs = child.getInputs();
                 if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) {
                     foundTarget = false;
                     break;
                 } else {
-                    child = (AbstractLogicalOperator) childInputs.get(0).getValue();
+                    childRef = childInputs.get(0);
+                    child = (AbstractLogicalOperator) childRef.getValue();
                 }
             }
             /** the target order-by operator has not been found. */
@@ -110,7 +121,7 @@
                 return false;
             }
 
-            /** duplicate the order-by operator and insert on-top-of the subplan operator */
+            /** copy the original order-by operator and insert on-top-of the subplan operator */
             context.addToDontApplySet(this, child);
             OrderOperator sourceOrderOp = (OrderOperator) child;
             List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
@@ -120,6 +131,11 @@
             inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
             newOrderOp.getInputs().add(inputOpRef);
             context.computeAndSetTypeEnvironmentForOperator(newOrderOp);
+
+            if (!orderSensitive) {
+                /** remove the original order-by */
+                childRef.setValue(sourceOrderOp.getInputs().get(0).getValue());
+            }
             changed = true;
         }
         return changed;
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 83925cc..76b6fcf 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -34,8 +34,7 @@
     private static final long serialVersionUID = 1L;
     private IAggregateEvaluatorFactory[] aggFactories;
 
-    public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys,
-            int[] fdColumns) {
+    public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys) {
         this.aggFactories = aggFactories;
     }
 
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 3de0966..e3cd7d8 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -493,7 +493,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) });
         IAggregateEvaluatorFactory[] aggFuns = new IAggregateEvaluatorFactory[] { new TupleCountAggregateFunctionFactory() };
         IAggregatorDescriptorFactory aggFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFuns,
-                new int[] { 3 }, new int[] {});
+                new int[] { 3 });
         HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 3 }, tpcf,
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                 aggFactory, gbyDesc, 1024);