Merge branch 'master' into salsubaiee/master_fix_asterix_issue_460
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index b1da831..fed1a15 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -50,15 +49,12 @@
@Override
public void recomputeSchema() {
- if (schema == null) {
- schema = new ArrayList<LogicalVariable>();
- }
- schema.clear();
- for (Mutable<ILogicalExpression> eRef : expressions) {
- ILogicalExpression e = eRef.getValue();
- if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression v = (VariableReferenceExpression) e;
- schema.add(v.getVariableReference());
+ schema = new ArrayList<LogicalVariable>();
+ schema.addAll(this.getDistinctByVarList());
+ List<LogicalVariable> inputSchema = inputs.get(0).getValue().getSchema();
+ for (LogicalVariable var : inputSchema) {
+ if (!schema.contains(var)) {
+ schema.add(var);
}
}
}
@@ -69,12 +65,16 @@
@Override
public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
throws AlgebricksException {
- for (Mutable<ILogicalExpression> eRef : expressions) {
- ILogicalExpression e = eRef.getValue();
- if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression v = (VariableReferenceExpression) e;
- target.addVariable(v.getVariableReference());
- }
+ /** make sure distinct key vars laid-out first */
+ for (LogicalVariable keyVar : getDistinctByVarList()) {
+ target.addVariable(keyVar);
+ }
+ /** add other source vars */
+ for (IOperatorSchema srcSchema : sources) {
+ for (LogicalVariable srcVar : srcSchema)
+ if (target.findVariable(srcVar) < 0) {
+ target.addVariable(srcVar);
+ }
}
}
};
@@ -128,16 +128,7 @@
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());
- IVariableTypeEnvironment childEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
- for (Mutable<ILogicalExpression> exprRef : expressions) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;
- env.setVarType(varRefExpr.getVariableReference(), childEnv.getType(expr));
- }
- }
- return env;
+ return createPropagatingAllInputsTypeEnvironment(ctx);
}
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 0de9652..5225ac7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
@@ -85,11 +87,17 @@
@Override
public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
- for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;
- schemaVariables.add(varRefExpr.getVariableReference());
+ List<LogicalVariable> allLiveVars = new ArrayList<LogicalVariable>();
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ VariableUtilities.getLiveVariables(c.getValue(), allLiveVars);
+ }
+ VariableUtilities.getProducedVariables(op, allLiveVars);
+ /** put distinct vars first */
+ schemaVariables.addAll(op.getDistinctByVarList());
+ /** then other live vars */
+ for (LogicalVariable var : allLiveVars) {
+ if (!schemaVariables.contains(var)) {
+ schemaVariables.add(var);
}
}
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index 29af97c..b5656cd 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -105,14 +105,24 @@
fdColumns[j++] = inputSchemas[0].findVariable(v);
}
}
+ int[] keysAndDecs = new int[keys.length + fdColumns.length];
+ for (int i = 0; i < keys.length; i++) {
+ keysAndDecs[i] = keys[i];
+ }
+ for (int i = 0; i < fdColumns.length; i++) {
+ keysAndDecs[i + keys.length] = fdColumns[i];
+ }
+
IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
columnList, context.getTypeEnvironment(op), context);
IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
- aggFactories, keys, fdColumns);
+ aggFactories, keysAndDecs);
- RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
- PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+ /** make fd columns part of the key but the comparator only compares the distinct key columns */
+ PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
comparatorFactories, aggregatorFactory, recordDescriptor);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 7bc150a..00670fa 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -45,6 +45,8 @@
public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule {
/** a set of order-breaking operators */
private final Set<LogicalOperatorTag> orderBreakingOps = new HashSet<LogicalOperatorTag>();
+ /** a set of order-sensitive operators */
+ private final Set<LogicalOperatorTag> orderSensitiveOps = new HashSet<LogicalOperatorTag>();
public EnforceOrderByAfterSubplan() {
/** add operators that break the ordering */
@@ -52,6 +54,9 @@
orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
+
+ /** add operators that are sensitive to the ordering */
+ orderSensitiveOps.add(LogicalOperatorTag.LIMIT);
}
@Override
@@ -90,19 +95,25 @@
* duplicate them on-top-of the subplan operator
*/
boolean foundTarget = true;
- AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ boolean orderSensitive = false;
+ Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
+ AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
while (child.getOperatorTag() != LogicalOperatorTag.ORDER) {
context.addToDontApplySet(this, child);
if (orderBreakingOps.contains(child.getOperatorTag())) {
foundTarget = false;
break;
}
+ if (orderSensitiveOps.contains(child.getOperatorTag())) {
+ orderSensitive = true;
+ }
List<Mutable<ILogicalOperator>> childInputs = child.getInputs();
if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) {
foundTarget = false;
break;
} else {
- child = (AbstractLogicalOperator) childInputs.get(0).getValue();
+ childRef = childInputs.get(0);
+ child = (AbstractLogicalOperator) childRef.getValue();
}
}
/** the target order-by operator has not been found. */
@@ -110,7 +121,7 @@
return false;
}
- /** duplicate the order-by operator and insert on-top-of the subplan operator */
+ /** copy the original order-by operator and insert on-top-of the subplan operator */
context.addToDontApplySet(this, child);
OrderOperator sourceOrderOp = (OrderOperator) child;
List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
@@ -120,6 +131,11 @@
inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
newOrderOp.getInputs().add(inputOpRef);
context.computeAndSetTypeEnvironmentForOperator(newOrderOp);
+
+ if (!orderSensitive) {
+ /** remove the original order-by */
+ childRef.setValue(sourceOrderOp.getInputs().get(0).getValue());
+ }
changed = true;
}
return changed;
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 83925cc..76b6fcf 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -34,8 +34,7 @@
private static final long serialVersionUID = 1L;
private IAggregateEvaluatorFactory[] aggFactories;
- public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys,
- int[] fdColumns) {
+ public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys) {
this.aggFactories = aggFactories;
}
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 3de0966..e3cd7d8 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -493,7 +493,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) });
IAggregateEvaluatorFactory[] aggFuns = new IAggregateEvaluatorFactory[] { new TupleCountAggregateFunctionFactory() };
IAggregatorDescriptorFactory aggFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFuns,
- new int[] { 3 }, new int[] {});
+ new int[] { 3 });
HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 3 }, tpcf,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
aggFactory, gbyDesc, 1024);