ASTERIXDB-1005, ASTERIXDB-1263: clean up subplan flattening, including:
1. Fixed the data property progation in HashJoin, NestedLoopJoin, PreClusteredGroupBy, and BroadcastExchange;
2. Fixed race conditions in SplitOperatorDescriptor;
3. Added a top-down pass for JobBuilder to set location constraints;
4. Fixed AbstractIntroduceGroupByCombinerRule for general cases.
Change-Id: I0197dc879cf983577e63ea5c047144966c0f7a3c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/572
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index a2ed2ae..813e58f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -41,6 +41,7 @@
public abstract LogicalVariable newVar();
+ @Override
public abstract IMetadataProvider<?, ?> getMetadataProvider();
public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
@@ -53,7 +54,7 @@
* returns true if op1 and op2 have already been compared
*/
public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
-
+
public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
public abstract void addNotToBeInlinedVar(LogicalVariable var);
@@ -72,6 +73,8 @@
public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
+ public void clearAllFDAndEquivalenceClasses();
+
public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
@@ -89,6 +92,6 @@
public abstract void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
-
+
public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
deleted file mode 100644
index d8089bd..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
-import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
-
-/**
- * This visitor is to add back variables that are killed in the query plan rooted at an input operator.
- * After visiting, it also provides a variable map for variables that have been
- * mapped in the query plan, e.g., by group-by, assign, and union.
- */
-class EnforceVariablesVisitor implements IQueryOperatorVisitor<ILogicalOperator, Collection<LogicalVariable>> {
- private final IOptimizationContext context;
- private final Map<LogicalVariable, LogicalVariable> inputVarToOutputVarMap = new HashMap<>();
-
- public EnforceVariablesVisitor(IOptimizationContext context) {
- this.context = context;
- }
-
- public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
- return inputVarToOutputVarMap;
- }
-
- @Override
- public ILogicalOperator visitAggregateOperator(AggregateOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return rewriteAggregateOperator(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- return rewriteAggregateOperator(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitGroupByOperator(GroupByOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- Set<LogicalVariable> liveVars = new HashSet<>();
- VariableUtilities.getLiveVariables(op, liveVars);
- varsToRecover.removeAll(liveVars);
-
- // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
- ILogicalExpression expr = keyVarExprRef.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
- LogicalVariable sourceVar = varExpr.getVariableReference();
- updateVarMapping(sourceVar, keyVarExprRef.first);
- varsToRecover.remove(sourceVar);
- }
- }
-
- for (LogicalVariable varToRecover : varsToRecover) {
- // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
- // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
- op.getDecorList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
- }
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitLimitOperator(LimitOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitOrderOperator(OrderOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitAssignOperator(AssignOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
- List<LogicalVariable> assignedVars = op.getVariables();
-
- // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
- for (int index = 0; index < assignedVars.size(); ++index) {
- ILogicalExpression expr = assignedExprRefs.get(index).getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
- LogicalVariable sourceVar = varExpr.getVariableReference();
- updateVarMapping(sourceVar, assignedVars.get(index));
- varsToRecover.remove(sourceVar);
- }
- }
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitSelectOperator(SelectOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitProjectOperator(ProjectOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- varsToRecover.removeAll(op.getVariables());
- // Adds all missing variables that should propagates up.
- op.getVariables().addAll(varsToRecover);
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitScriptOperator(ScriptOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
- }
-
- @Override
- public ILogicalOperator visitSubplanOperator(SubplanOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitUnionOperator(UnionAllOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- // Update the variable mappings
- List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
- for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
- updateVarMapping(triple.second, triple.first);
- updateVarMapping(triple.third, triple.first);
- }
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitUnnestOperator(UnnestOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- Set<LogicalVariable> liveVars = new HashSet<>();
- VariableUtilities.getLiveVariables(op, liveVars);
- varsToRecover.remove(liveVars);
- if (!varsToRecover.isEmpty()) {
- op.setPropagatesInput(true);
- return visitsInputs(op, varsToRecover);
- }
- return op;
- }
-
- @Override
- public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitDistinctOperator(DistinctOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- @Override
- public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op,
- Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
- Set<LogicalVariable> liveVars = new HashSet<>();
- VariableUtilities.getLiveVariables(op, liveVars);
- varsToRecover.retainAll(liveVars);
- if (!varsToRecover.isEmpty()) {
- op.setPropagateInput(true);
- return visitsInputs(op, varsToRecover);
- }
- return op;
- }
-
- @Override
- public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- return visitsInputs(op, varsToRecover);
- }
-
- /**
- * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
- * the group-by keys are variables in varsToRecover.
- * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
- * Instead, it could only be used for rewriting a nested plan within a subplan operator.
- *
- * @param op
- * the logical operator for aggregate or running aggregate.
- * @param varsToRecover
- * the set of variables that needs to preserve.
- * @return the wrapped group-by operator if {@code varsToRecover} is not empty, and {@code op} otherwise.
- * @throws AlgebricksException
- */
- private ILogicalOperator rewriteAggregateOperator(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- Set<LogicalVariable> liveVars = new HashSet<>();
- VariableUtilities.getLiveVariables(op, liveVars);
- varsToRecover.removeAll(liveVars);
-
- GroupByOperator gbyOp = new GroupByOperator();
- for (LogicalVariable varToRecover : varsToRecover) {
- // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
- // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
- LogicalVariable newVar = context.newVar();
- gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
- updateVarMapping(varToRecover, newVar);
- }
-
- NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
- op.getInputs().clear();
- op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
-
- ILogicalOperator inputOp = op.getInputs().get(0).getValue();
- ILogicalPlan nestedPlan = new ALogicalPlanImpl();
- nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
- gbyOp.getNestedPlans().add(nestedPlan);
- gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
-
- OperatorManipulationUtil.computeTypeEnvironmentBottomUp(op, context);
- return visitsInputs(gbyOp, varsToRecover);
- }
-
- private ILogicalOperator visitsInputs(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
- throws AlgebricksException {
- if (op.getInputs().size() == 0 || varsToRecover.isEmpty()) {
- return op;
- }
- Set<LogicalVariable> producedVars = new HashSet<>();
- VariableUtilities.getProducedVariables(op, producedVars);
- varsToRecover.removeAll(producedVars);
- if (!varsToRecover.isEmpty()) {
- if (op.getInputs().size() == 1) {
- // Deals with single input operators.
- ILogicalOperator newOp = op.getInputs().get(0).getValue().accept(this, varsToRecover);
- op.getInputs().get(0).setValue(newOp);
- } else {
- // Deals with multi-input operators.
- for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
- ILogicalOperator child = childRef.getValue();
- Set<LogicalVariable> varsToRecoverInChild = new HashSet<>();
- VariableUtilities.getProducedVariablesInDescendantsAndSelf(child, varsToRecoverInChild);
- // Obtains the variables that this particular child should propagate.
- varsToRecoverInChild.retainAll(varsToRecover);
- ILogicalOperator newChild = child.accept(this, varsToRecoverInChild);
- childRef.setValue(newChild);
- }
- }
- }
- return op;
- }
-
- private void updateVarMapping(LogicalVariable oldVar, LogicalVariable newVar) {
- if (oldVar.equals(newVar)) {
- return;
- }
- LogicalVariable mappedVar = newVar;
- if (inputVarToOutputVarMap.containsKey(newVar)) {
- mappedVar = inputVarToOutputVarMap.get(newVar);
- inputVarToOutputVarMap.remove(newVar);
- }
- inputVarToOutputVarMap.put(oldVar, mappedVar);
- }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index aa9848c..b86e8e9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -45,14 +44,16 @@
throws AlgebricksException {
List<Mutable<ILogicalOperator>> inputs1 = op.getInputs();
List<Mutable<ILogicalOperator>> inputs2 = arg.getInputs();
- if (inputs1.size() != inputs2.size())
- return Boolean.FALSE;
+ if (inputs1.size() != inputs2.size()) {
+ return false;
+ }
for (int i = 0; i < inputs1.size(); i++) {
ILogicalOperator input1 = inputs1.get(i).getValue();
ILogicalOperator input2 = inputs2.get(i).getValue();
boolean isomorphic = isOperatorIsomorphicPlanSegment(input1, input2);
- if (!isomorphic)
- return Boolean.FALSE;
+ if (!isomorphic) {
+ return false;
+ }
}
return IsomorphismUtilities.isOperatorIsomorphic(op, arg);
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index f0bcd34..da252d4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -52,6 +52,9 @@
}
public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
+ if (expr == null) {
+ return null;
+ }
return expr.accept(this, null);
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index ec7d7fe..2a92ead 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -348,7 +348,7 @@
@Override
public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, ILogicalOperator arg)
throws AlgebricksException {
- NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(arg));
+ NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(op.getDataSourceReference());
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 74e74a6..5cf30c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -134,7 +134,7 @@
@Override
public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
throws AlgebricksException {
- return new NestedTupleSourceOperator(null);
+ return new NestedTupleSourceOperator(op.getDataSourceReference());
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index ddcf6c8..4b791c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -436,7 +436,9 @@
return;
}
IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
- env.substituteProducedVariable(arg.first, arg.second);
+ if (env != null) {
+ env.substituteProducedVariable(arg.first, arg.second);
+ }
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index f80e1cd..e82b4f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -22,7 +22,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -167,6 +166,9 @@
}
break;
}
+ case RANDOM_PARTITION_EXCHANGE: {
+ break;
+ }
default: {
throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 70eb544..0352f83 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -28,10 +28,8 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class VariableUtilities {
@@ -54,6 +52,14 @@
op.accept(visitor, null);
}
+ public static void getSubplanLocalLiveVariables(ILogicalOperator op, Collection<LogicalVariable> liveVariables)
+ throws AlgebricksException {
+ VariableUtilities.getLiveVariables(op, liveVariables);
+ Set<LogicalVariable> locallyProducedVars = new HashSet<>();
+ VariableUtilities.getProducedVariablesInDescendantsAndSelf(op, locallyProducedVars);
+ liveVariables.retainAll(locallyProducedVars);
+ }
+
public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
throws AlgebricksException {
// DFS traversal
@@ -77,6 +83,21 @@
substituteVariables(op, v1, v2, true, ctx);
}
+ public static void substituteVariables(ILogicalOperator op, Map<LogicalVariable, LogicalVariable> varMap,
+ ITypingContext ctx) throws AlgebricksException {
+ for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+ VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), ctx);
+ }
+ }
+
+ public static void substituteVariables(ILogicalOperator op,
+ List<Pair<LogicalVariable, LogicalVariable>> oldVarNewVarMapHistory, ITypingContext ctx)
+ throws AlgebricksException {
+ for (Pair<LogicalVariable, LogicalVariable> entry : oldVarNewVarMapHistory) {
+ VariableUtilities.substituteVariables(op, entry.first, entry.second, ctx);
+ }
+ }
+
public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
@@ -100,32 +121,4 @@
return varSet.equals(varArgSet);
}
- /**
- * Recursively modifies the query plan to make sure every variable in {@code varsToEnforce}
- * be part of the output schema of {@code opRef}.
- *
- * @param opRef,
- * the operator to enforce.
- * @param varsToEnforce,
- * the variables that needs to be live after the operator of {@code opRef}.
- * @param context,
- * the optimization context.
- * @return a map that maps a variable in {@code varsToEnforce} to yet-another-variable if
- * a mapping happens in the query plan under {@code opRef}, e.g., by grouping and assigning.
- * @throws AlgebricksException
- */
- public static Map<LogicalVariable, LogicalVariable> enforceVariablesInDescendantsAndSelf(
- Mutable<ILogicalOperator> opRef, Collection<LogicalVariable> varsToEnforce, IOptimizationContext context)
- throws AlgebricksException {
- Set<LogicalVariable> copiedVarsToEnforce = new HashSet<>();
- copiedVarsToEnforce.addAll(varsToEnforce);
- // Rewrites the query plan
- EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(context);
- ILogicalOperator result = opRef.getValue().accept(visitor, copiedVarsToEnforce);
- opRef.setValue(result);
- // Re-computes the type environment bottom up.
- OperatorManipulationUtil.computeTypeEnvironmentBottomUp(result, context);
- return visitor.getInputVariableToOutputVariableMap();
- }
-
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index f5ea5f1..09d2253 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -127,9 +127,8 @@
public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
- if (firstDeliveredPartitioning != null
- && firstDeliveredPartitioning.getPartitioningType() == requirements
- .getPartitioningType()) {
+ if (firstDeliveredPartitioning != null && firstDeliveredPartitioning
+ .getPartitioningType() == requirements.getPartitioningType()) {
switch (requirements.getPartitioningType()) {
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
@@ -139,8 +138,8 @@
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
- List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
- : keysLeftBranch;
+ List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
+ ? keysRightBranch : keysLeftBranch;
List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
: keysRightBranch;
for (LogicalVariable r : uppreq.getColumnSet()) {
@@ -155,8 +154,8 @@
j++;
}
if (!found) {
- throw new IllegalStateException("Did not find a variable equivalent to "
- + r + " among " + keysFirst);
+ throw new IllegalStateException("Did not find a variable equivalent to " + r
+ + " among " + keysFirst);
}
LogicalVariable v2 = keysSecond.get(j);
EquivalenceClass ecFst = eqmap.get(v2);
@@ -167,6 +166,9 @@
break;
}
}
+ if (covered.equals(set1)) {
+ break;
+ }
}
if (!covered.equals(set1)) {
throw new AlgebricksException("Could not modify " + requirements
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 66cb6b2..2c93cd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -20,12 +20,13 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -54,6 +55,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
@@ -155,7 +157,18 @@
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
IPhysicalOperator pop2 = op2.getPhysicalOperator();
if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
- List<LogicalVariable> sndOrder = ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+ List<LogicalVariable> gbyColumns = ((AbstractPreclusteredGroupByPOperator) pop2)
+ .getGbyColumns();
+ List<LogicalVariable> sndOrder = new ArrayList<>();
+ sndOrder.addAll(gbyColumns);
+ Set<LogicalVariable> freeVars = new HashSet<>();
+ try {
+ OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(op2, freeVars);
+ } catch (AlgebricksException e) {
+ throw new IllegalStateException(e);
+ }
+ // Only considers group key variables defined out-side the outer-most group-by operator.
+ sndOrder.retainAll(freeVars);
groupProp.getColumnSet().addAll(sndOrder);
groupProp.getPreferredOrderEnforcer().addAll(sndOrder);
goon = false;
@@ -210,9 +223,8 @@
tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
fdList.add(new FunctionalDependency(hd, tl));
}
- if (allOk
- && PropertiesUtil.matchLocalProperties(localProps, props,
- new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
+ if (allOk && PropertiesUtil.matchLocalProperties(localProps, props,
+ new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
localProps = props;
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index c0e20d6..03a8666 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -18,15 +18,17 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+import java.util.ArrayList;
+
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -52,10 +54,9 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
IPartitioningProperty pp = new BroadcastPartitioningProperty(domain);
- this.deliveredProperties = new StructuralPropertiesVector(pp, op2.getDeliveredPhysicalProperties()
- .getLocalProperties());
+ // Broadcasts will destroy input local properties.
+ this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>());
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 34e3dc6..071ee72 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -18,12 +18,14 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.logging.Logger;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -34,6 +36,8 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -106,14 +110,14 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
- keysLeftBranch, env, context);
- IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
- keysLeftBranch, env, context);
+ IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper
+ .variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
+ IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch,
+ env, context);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
int i = 0;
IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -173,9 +177,10 @@
case INNER: {
opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
- comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
- keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
- keysRight, keysLeft), predEvaluatorFactory);
+ comparatorFactories, recDescriptor,
+ new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+ new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+ predEvaluatorFactory);
break;
}
case LEFT_OUTER: {
@@ -185,9 +190,10 @@
}
opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
- comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
- keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
- keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
+ comparatorFactories, recDescriptor,
+ new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+ new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+ predEvaluatorFactory, true, nullWriterFactories);
break;
}
default: {
@@ -209,7 +215,31 @@
@Override
protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context)
throws AlgebricksException {
- return new LinkedList<ILocalStructuralProperty>();
+ List<ILocalStructuralProperty> deliveredLocalProperties = new ArrayList<ILocalStructuralProperty>();
+ // Inner join can kick off the "role reversal" optimization, which can kill data properties for the probe side.
+ if (kind == JoinKind.LEFT_OUTER) {
+ AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+ List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+ if (probeSideLocalProperties != null) {
+ // The local grouping property in the probe side will be maintained
+ // and the local ordering property in the probe side will be turned into a local grouping property
+ // if the grouping variables (or sort columns) in the local property contain all the join key variables
+ // for the left branch:
+ // 1. in case spilling is not kicked off, the ordering property is maintained and hence local grouping property is maintained.
+ // 2. if spilling is kicked off, the grouping property is still maintained though the ordering property is destroyed.
+ for (ILocalStructuralProperty property : probeSideLocalProperties) {
+ Set<LogicalVariable> groupingVars = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> leftBranchVars = new ListSet<LogicalVariable>();
+ property.getVariables(groupingVars);
+ leftBranchVars.addAll(getKeysLeftBranch());
+ if (groupingVars.containsAll(leftBranchVars)) {
+ deliveredLocalProperties.add(new LocalGroupingProperty(groupingVars));
+ }
+ }
+ }
+ }
+ return deliveredLocalProperties;
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index a923944..7ff15d7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.LinkedList;
import java.util.List;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -105,8 +104,11 @@
pp = IPartitioningProperty.UNPARTITIONED;
}
- List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
- this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
+ // Nested loop join maintains the local structure property for the probe side.
+ AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+ List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+ this.deliveredProperties = new StructuralPropertiesVector(pp, probeSideLocalProperties);
}
@Override
@@ -125,7 +127,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
@@ -221,8 +223,8 @@
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
- boolean result = binaryBooleanInspector
- .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+ p.getLength());
if (result)
return 0;
else
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index 42e6bcf..af0087d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -47,9 +47,10 @@
this.domain = domain;
}
+ @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
opSchema, context);
builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
@@ -62,9 +63,10 @@
return false;
}
+ @Override
public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory();
MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
@@ -77,8 +79,8 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
- .getDeliveredPhysicalProperties().getLocalProperties());
+ this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+ op2.getDeliveredPhysicalProperties().getLocalProperties());
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b4569e4..b5099b1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -239,7 +240,35 @@
for (Mutable<ILogicalOperator> children : op.getInputs()) {
computeTypeEnvironmentBottomUp(children.getValue(), context);
}
+ AbstractLogicalOperator abstractOp = (AbstractLogicalOperator) op;
+ if (abstractOp.hasNestedPlans()) {
+ for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootRef : p.getRoots()) {
+ computeTypeEnvironmentBottomUp(rootRef.getValue(), context);
+ }
+ }
+ }
context.computeAndSetTypeEnvironmentForOperator(op);
}
+ /***
+ * Is the operator <code>>op</code> an ancestor of any operators with tags in the set <code>tags</code>?
+ *
+ * @param op
+ * @param tags
+ * @return True if yes; false other wise.
+ */
+ public static boolean ancestorOfOperators(ILogicalOperator op, Set<LogicalOperatorTag> tags) {
+ LogicalOperatorTag opTag = op.getOperatorTag();
+ if (tags.contains(opTag)) {
+ return true;
+ }
+ for (Mutable<ILogicalOperator> children : op.getInputs()) {
+ if (ancestorOfOperators(children.getValue(), tags)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 8e254f0..d130d4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -98,7 +98,8 @@
}
@Override
- public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest, int destInputIndex) {
+ public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest,
+ int destInputIndex) {
ArrayList<ILogicalOperator> outputs = outEdges.get(src);
if (outputs == null) {
outputs = new ArrayList<ILogicalOperator>();
@@ -144,7 +145,13 @@
List<OperatorDescriptorId> roots = jobSpec.getRoots();
setSpecifiedPartitionConstraints();
for (OperatorDescriptorId rootId : roots) {
- setPartitionConstraintsDFS(rootId, tgtConstraints, null);
+ setPartitionConstraintsBottomup(rootId, tgtConstraints, null, false);
+ }
+ for (OperatorDescriptorId rootId : roots) {
+ setPartitionConstraintsTopdown(rootId, tgtConstraints, null);
+ }
+ for (OperatorDescriptorId rootId : roots) {
+ setPartitionConstraintsBottomup(rootId, tgtConstraints, null, true);
}
}
@@ -161,7 +168,7 @@
}
}
- private void setPartitionConstraintsDFS(OperatorDescriptorId opId,
+ private void setPartitionConstraintsTopdown(OperatorDescriptorId opId,
Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp) {
List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
AlgebricksPartitionConstraint opConstraint = null;
@@ -172,9 +179,39 @@
org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
.getConnectorOperatorMap().get(cid);
IOperatorDescriptor src = p.getLeft().getLeft();
- // DFS
- setPartitionConstraintsDFS(src.getOperatorId(), tgtConstraints, opDesc);
+ TargetConstraint constraint = tgtConstraints.get(conn);
+ if (constraint != null) {
+ if (constraint == TargetConstraint.SAME_COUNT) {
+ opConstraint = partitionConstraintMap.get(opDesc);
+ if (partitionConstraintMap.get(src) == null) {
+ if (opConstraint != null) {
+ partitionConstraintMap.put(src, opConstraint);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, src,
+ opConstraint);
+ }
+ }
+ }
+ }
+ // Post Order DFS
+ setPartitionConstraintsTopdown(src.getOperatorId(), tgtConstraints, opDesc);
+ }
+ }
+ }
+ private void setPartitionConstraintsBottomup(OperatorDescriptorId opId,
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp,
+ boolean finalPass) {
+ List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
+ AlgebricksPartitionConstraint opConstraint = null;
+ IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
+ if (opInputs != null) {
+ for (IConnectorDescriptor conn : opInputs) {
+ ConnectorDescriptorId cid = conn.getConnectorId();
+ org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
+ .getConnectorOperatorMap().get(cid);
+ IOperatorDescriptor src = p.getLeft().getLeft();
+ // Pre-order DFS
+ setPartitionConstraintsBottomup(src.getOperatorId(), tgtConstraints, opDesc, finalPass);
TargetConstraint constraint = tgtConstraints.get(conn);
if (constraint != null) {
switch (constraint) {
@@ -200,12 +237,14 @@
opConstraint = new AlgebricksCountPartitionConstraint(1);
}
}
- if (opConstraint == null) {
+ if (opConstraint == null && finalPass) {
opConstraint = clusterLocations;
}
}
- partitionConstraintMap.put(opDesc, opConstraint);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+ if (opConstraint != null) {
+ partitionConstraintMap.put(opDesc, opConstraint);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+ }
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 16c4855..286d5c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -67,7 +67,6 @@
private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
- @SuppressWarnings("unchecked")
private IMetadataProvider metadataProvider;
private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
@@ -90,7 +89,8 @@
public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+ PhysicalOptimizationConfig physicalOptimizationConfig,
+ LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
this.varCounter = varCounter;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -100,29 +100,35 @@
this.prettyPrintVisitor = prettyPrintVisitor;
}
+ @Override
public int getVarCounter() {
return varCounter;
}
+ @Override
public void setVarCounter(int varCounter) {
this.varCounter = varCounter;
}
+ @Override
public LogicalVariable newVar() {
varCounter++;
LogicalVariable var = new LogicalVariable(varCounter);
return var;
}
+ @Override
@SuppressWarnings("unchecked")
public IMetadataProvider getMetadataProvider() {
return metadataProvider;
}
+ @Override
public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider) {
this.metadataProvider = metadataProvider;
}
+ @Override
public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
HashSet<ILogicalOperator> operators = dontApply.get(rule);
if (operators == null) {
@@ -132,6 +138,7 @@
}
}
+ @Override
public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
HashSet<ILogicalOperator> operators = dontApply.get(rule);
if (operators == null) {
@@ -164,26 +171,30 @@
}
}
}
-
+
@Override
public void removeFromAlreadyCompared(ILogicalOperator op1) {
alreadyCompared.remove(op1);
}
+ @Override
public void addNotToBeInlinedVar(LogicalVariable var) {
notToBeInlinedVars.add(var);
}
+ @Override
public boolean shouldNotBeInlined(LogicalVariable var) {
return notToBeInlinedVars.contains(var);
}
+ @Override
public void addPrimaryKey(FunctionalDependency pk) {
assert (pk.getTail().size() == 1);
LogicalVariable recordVar = pk.getTail().get(0);
recordToPrimaryKey.put(recordVar, pk);
}
+ @Override
public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
if (fd == null) {
@@ -213,6 +224,12 @@
}
@Override
+ public void clearAllFDAndEquivalenceClasses() {
+ eqClassGlobalMap.clear();
+ fdGlobalMap.clear();
+ }
+
+ @Override
public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op) {
return logicalProps.get(op);
}
@@ -232,10 +249,12 @@
return varEvalSizeEnv;
}
+ @Override
public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
return mergeAggregationExpressionFactory;
}
+ @Override
public PhysicalOptimizationConfig getPhysicalOptimizationConfig() {
return physicalOptimizationConfig;
}
@@ -295,7 +314,7 @@
me.setValue(new FunctionalDependency(hd, tl));
}
}
-
+
@Override
public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
return prettyPrintVisitor;
diff --git a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java b/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
deleted file mode 100644
index d0676cf..0000000
--- a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class EnforceVariablesVisitorTest {
-
- /**
- * Tests the processing of project operator in RecoverVariablesVisitor.
- *
- * @throws Exception
- */
- @Test
- public void testProject() throws Exception {
- // Constructs the input operator.
- LogicalVariable var = new LogicalVariable(1);
- List<LogicalVariable> inputVarList = new ArrayList<>();
- inputVarList.add(var);
- ProjectOperator projectOp = new ProjectOperator(inputVarList);
-
- // Constructs the visitor.
- IOptimizationContext mockedContext = mock(IOptimizationContext.class);
- EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
- // Calls the visitor.
- LogicalVariable varToEnforce = new LogicalVariable(2);
- ProjectOperator op = (ProjectOperator) projectOp.accept(visitor,
- Arrays.asList(new LogicalVariable[] { varToEnforce }));
-
- // Checks the result.
- List<LogicalVariable> expectedVars = Arrays.asList(new LogicalVariable[] { var, varToEnforce });
- Assert.assertEquals(expectedVars, op.getVariables());
- Assert.assertTrue(visitor.getInputVariableToOutputVariableMap().isEmpty());
- }
-
- /**
- * Tests the processing of group-by operator in RecoverVariablesVisitor.
- *
- * @throws Exception
- */
- @Test
- public void testGroupby() throws Exception {
- // Constructs the group-by operator.
- LogicalVariable keyVar = new LogicalVariable(2);
- LogicalVariable keyExprVar = new LogicalVariable(1);
- GroupByOperator gbyOp = new GroupByOperator();
- gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
-
- // Constructs the visitor.
- IOptimizationContext mockedContext = mock(IOptimizationContext.class);
- EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
- // Calls the visitor.
- LogicalVariable varToEnforce = new LogicalVariable(3);
- Set<LogicalVariable> varsToEnforce = new HashSet<>();
- varsToEnforce.add(keyExprVar);
- varsToEnforce.add(varToEnforce);
- GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
- // Checks the result.
- Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
- expectedVarMap.put(keyExprVar, keyVar);
- Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
- VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
- .getValue();
- Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
- }
-
- /**
- * Tests the processing of aggregate operator in RecoverVariablesVisitor.
- *
- * @throws Exception
- */
- @Test
- public void testAggregate() throws Exception {
- // Constructs the group-by operator.
- List<LogicalVariable> aggVars = new ArrayList<>();
- List<Mutable<ILogicalExpression>> aggExprRefs = new ArrayList<>();
- AggregateOperator aggOp = new AggregateOperator(aggVars, aggExprRefs);
-
- // Constructs the visitor.
- LogicalVariable var = new LogicalVariable(3);
- IOptimizationContext mockedContext = mock(IOptimizationContext.class);
- when(mockedContext.newVar()).thenReturn(var);
- EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
- // Calls the visitor.
- LogicalVariable varToEnforce = new LogicalVariable(2);
- Set<LogicalVariable> varsToEnforce = new HashSet<>();
- varsToEnforce.add(varToEnforce);
- GroupByOperator op = (GroupByOperator) aggOp.accept(visitor, varsToEnforce);
-
- // Checks the result.
- Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
- expectedVarMap.put(varToEnforce, var);
- Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
- VariableReferenceExpression keyExpr = (VariableReferenceExpression) op.getGroupByList().get(0).second
- .getValue();
- Assert.assertEquals(keyExpr.getVariableReference(), varToEnforce);
- LogicalVariable expectedGbyVar = op.getGroupByList().get(0).first;
- Assert.assertEquals(expectedGbyVar, var);
- }
-
- /**
- * Tests the processing of two serial group-by operators in RecoverVariablesVisitor.
- *
- * @throws Exception
- */
- @Test
- public void testTwoGroupbys() throws Exception {
- // Constructs the group-by operators.
- LogicalVariable keyVar = new LogicalVariable(1);
- LogicalVariable keyExprVar = new LogicalVariable(2);
- GroupByOperator gbyOp = new GroupByOperator();
- gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
- LogicalVariable keyVar2 = new LogicalVariable(2);
- LogicalVariable keyExprVar2 = new LogicalVariable(3);
- GroupByOperator gbyOp2 = new GroupByOperator();
- gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar2,
- new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar2))));
- gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp2));
-
- // Constructs the visitor.
- IOptimizationContext mockedContext = mock(IOptimizationContext.class);
- EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
- // Calls the visitor.
- LogicalVariable varToEnforce = new LogicalVariable(4);
- Set<LogicalVariable> varsToEnforce = new HashSet<>();
- varsToEnforce.add(keyExprVar2);
- varsToEnforce.add(varToEnforce);
- GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
- // Checks the result.
- Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
- expectedVarMap.put(keyExprVar2, keyVar);
- Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
- VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
- .getValue();
- Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
- GroupByOperator op2 = (GroupByOperator) op.getInputs().get(0).getValue();
- VariableReferenceExpression decorVarExpr2 = (VariableReferenceExpression) op2.getDecorList().get(0).second
- .getValue();
- Assert.assertEquals(decorVarExpr2.getVariableReference(), varToEnforce);
- }
-
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index 9aaa3a7..2d2619e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -28,7 +28,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,6 +44,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -186,7 +186,7 @@
private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
Set<SimilarAggregatesInfo> toReplaceSet = new HashSet<SimilarAggregatesInfo>();
for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
@@ -215,7 +215,7 @@
VariableUtilities.getProducedVariables(rootRef.getValue(), newVars);
}
- // Replaces variable exprs referring to the variables produced by newPlan by
+ // Replaces variable exprs referring to the variables produced by newPlan by
// those produced by plan.
Iterator<LogicalVariable> originalVarIter = originalVars.iterator();
Iterator<LogicalVariable> newVarIter = newVars.iterator();
@@ -246,7 +246,7 @@
private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
List<Mutable<ILogicalOperator>> toPushAccumulate, Set<SimilarAggregatesInfo> toReplaceSet)
- throws AlgebricksException {
+ throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
@@ -273,6 +273,11 @@
} else {
GroupByOperator nestedGby = (GroupByOperator) op3;
List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+ Set<LogicalVariable> freeVars = new HashSet<>();
+ // Removes non-free variables defined in the nested plan.
+ OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(nestedGby, freeVars);
+ gbyVars2.retainAll(freeVars);
+
List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
concatGbyVars.addAll(gbyVars2);
for (ILogicalPlan p : nestedGby.getNestedPlans()) {
@@ -288,50 +293,99 @@
* Push the nested pipeline which provides the input to the nested group operator into newGbyOp (the combined gby op).
* The change is to fix asterixdb issue 782.
*/
- Mutable<ILogicalOperator> nestedGbyInputRef = nestedGby.getInputs().get(0);
- Mutable<ILogicalOperator> startOfPipelineRef = nestedGbyInputRef;
- if (startOfPipelineRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ // Finds the reference of the bottom-most operator in the pipeline that
+ // should not be pushed to the combiner group-by.
+ Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby);
+ Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef);
+
+ // Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code>
+ // into the group-by keys for the introduced combiner group-by operator.
+ Set<LogicalVariable> usedVars = collectUsedFreeVariables(currentOpRef, bottomOpRef);
+ for (LogicalVariable usedVar : usedVars) {
+ if (!concatGbyVars.contains(usedVar)) {
+ concatGbyVars.add(usedVar);
+ }
+ }
+
+ // Retains the nested pipeline above the identified operator in the old group-by operator.
+ // Pushes the nested pipeline under the select operator into the new group-by operator.
+ Mutable<ILogicalOperator> oldNtsRef = findNtsRef(currentOpRef);
+ ILogicalOperator opToCombiner = bottomOpRef.getValue().getInputs().get(0).getValue();
+ if (opToCombiner.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ // No pipeline other than the aggregate operator needs to push to combiner.
return true;
}
-
- // move down the nested pipeline to find the start of the pipeline right upon the nested-tuple-source
- boolean hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
- .getValue());
- while (startOfPipelineRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
- startOfPipelineRef = startOfPipelineRef.getValue().getInputs().get(0);
- hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
- .getValue());
- }
- //keep the old nested-tuple-source
- Mutable<ILogicalOperator> oldNts = startOfPipelineRef.getValue().getInputs().get(0);
-
- //move down the nested op in the new gby operator
- Mutable<ILogicalOperator> newGbyNestedOpRef = toPushAccumulate.get(0);
- while (newGbyNestedOpRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
- newGbyNestedOpRef = newGbyNestedOpRef.getValue().getInputs().get(0);
- }
-
- //insert the pipeline before nested gby into the new (combiner) gby's nested plan on top of the nested-tuple-source
- startOfPipelineRef.getValue().getInputs().set(0, newGbyNestedOpRef.getValue().getInputs().get(0));
- newGbyNestedOpRef.getValue().getInputs().set(0, nestedGbyInputRef);
-
- //in the old gby operator, remove the nested pipeline since it is already pushed to the combiner gby
- nestedGby.getInputs().set(0, oldNts);
- List<LogicalVariable> aggProducedVars = new ArrayList<LogicalVariable>();
- VariableUtilities.getProducedVariables(toPushAccumulate.get(0).getValue(), aggProducedVars);
-
- if (hasIsNullFunction && aggProducedVars.size() != 0) {
- // if the old nested pipeline contains a not-null-check, we need to convert it to a not-system-null-check in the non-local gby
- processNullTest(context, nestedGby, aggProducedVars);
- }
-
+ bottomOpRef.getValue().getInputs().set(0, new MutableObject<ILogicalOperator>(oldNtsRef.getValue()));
+ Mutable<ILogicalOperator> newGbyNestedOpRef = findNtsRef(toPushAccumulate.get(0));
+ NestedTupleSourceOperator newNts = (NestedTupleSourceOperator) newGbyNestedOpRef.getValue();
+ newGbyNestedOpRef.setValue(opToCombiner);
+ oldNtsRef.setValue(newNts);
return true;
}
}
/**
+ * Find the set of used free variables along the pipeline from <code>topOpRef</code> (exclusive)
+ * to <code>bottomOpRef</code> (inclusive).
+ *
+ * @param topOpRef,
+ * the top root of the pipeline.
+ * @param bottomOpRef,
+ * the bottom of the pipeline.
+ * @return the set of used variables.
+ * @throws AlgebricksException
+ */
+ private Set<LogicalVariable> collectUsedFreeVariables(Mutable<ILogicalOperator> topOpRef,
+ Mutable<ILogicalOperator> bottomOpRef) throws AlgebricksException {
+ Set<LogicalVariable> usedVars = new HashSet<>();
+ Mutable<ILogicalOperator> currentOpRef = topOpRef;
+ while (currentOpRef != bottomOpRef) {
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ VariableUtilities.getUsedVariables(currentOpRef.getValue(), usedVars);
+ }
+ Set<LogicalVariable> freeVars = new HashSet<>();
+ OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) topOpRef.getValue(), freeVars);
+ usedVars.retainAll(freeVars);
+ return usedVars;
+ }
+
+ /**
+ * Find the reference of a nested tuple source operator in the query pipeline rooted at <code>currentOpRef</code>
+ *
+ * @param currentOpRef
+ * @return the reference of a nested tuple source operator
+ */
+ private Mutable<ILogicalOperator> findNtsRef(Mutable<ILogicalOperator> currentOpRef) {
+ while (currentOpRef.getValue().getInputs().size() > 0) {
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ return currentOpRef;
+ }
+
+ /**
+ * Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code>
+ * that cannot be pushed into the combiner group-by operator.
+ *
+ * @param currentOpRef
+ * @return the bottom-most reference of a select operator
+ */
+ private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+ throws AlgebricksException {
+ Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
+ while (currentOpRef.getValue().getInputs().size() > 0) {
+ Set<LogicalVariable> producedVars = new HashSet<>();
+ VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars);
+ if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
+ bottomOpRef = currentOpRef;
+ }
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ return bottomOpRef;
+ }
+
+ /**
* Deal with the case where the nested plan in the combiner gby operator has a null-test before invoking aggregation functions.
- *
+ *
* @param context
* The optimization context.
* @param nestedGby
@@ -341,4 +395,4 @@
*/
protected abstract void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
List<LogicalVariable> aggregateVarsProducedByCombiner);
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 2181efa..8bf1ad5 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -104,7 +104,8 @@
}
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
// wait for the physical operators to be set first
if (op.getPhysicalOperator() == null) {
@@ -166,18 +167,17 @@
List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>();
for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
- deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties()
- .getPartitioningProperty());
+ deliveredPartitioningPropertiesFromChildren
+ .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
}
int partitioningCompatibleChild = 0;
for (int i = 0; i < op.getInputs().size(); i++) {
IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
- if (reqdProperties == null
- || reqdProperties[i] == null
- || reqdProperties[i].getPartitioningProperty() == null
- || deliveredPropertyFromChild == null
- || reqdProperties[i].getPartitioningProperty().getPartitioningType() != deliveredPartitioningPropertiesFromChildren
- .get(i).getPartitioningType()) {
+ if (reqdProperties == null || reqdProperties[i] == null
+ || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
+ || reqdProperties[i].getPartitioningProperty()
+ .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
+ .getPartitioningType()) {
continue;
}
IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
@@ -248,8 +248,8 @@
AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator()
- + ": " + delivered + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .finest(">>>> Properties delivered by " + child.getPhysicalOperator() + ": " + delivered + "\n");
IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
// Coordinates requirements by looking at the firstDeliveredPartitioning.
Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
@@ -258,8 +258,8 @@
IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
requiredProperty.getLocalProperties());
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator()
- + ": " + rqd + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n");
// The partitioning property of reqdProperties[childIndex] could be updated here because
// rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
@@ -273,7 +273,8 @@
changed = true;
addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
- AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex).getValue());
+ AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex)
+ .getValue());
if (newChild != child) {
delivered = newChild.getDeliveredPhysicalProperties();
@@ -308,8 +309,8 @@
if (opIsRedundantSort) {
if (AlgebricksConfig.DEBUG) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Removing redundant SORT operator "
- + op.getPhysicalOperator() + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .fine(">>>> Removing redundant SORT operator " + op.getPhysicalOperator() + "\n");
printOp(op);
}
changed = true;
@@ -332,7 +333,7 @@
private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties();
Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild);
@@ -343,8 +344,8 @@
newChildEqClasses = context.getEquivalenceClassMap(newChild);
newChildFDs = context.getFDList(newChild);
}
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for new op. "
- + newChild.getPhysicalOperator() + ": " + required + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER.finest(
+ ">>>> Required properties for new op. " + newChild.getPhysicalOperator() + ": " + required + "\n");
return newDelivered.getUnsatisfiedPropertiesFrom(required, mayExpandPartitioningProperties, newChildEqClasses,
newChildFDs);
@@ -418,16 +419,16 @@
IPhysicalPropertiesVector diffOfProperties, IOptimizationContext context) {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.ORDER
- || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT && op
- .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
+ || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
+ && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
|| delivered.getLocalProperties() == null) {
return false;
}
AbstractStableSortPOperator sortOp = (AbstractStableSortPOperator) op.getPhysicalOperator();
sortOp.computeLocalProperties(op);
ILocalStructuralProperty orderProp = sortOp.getOrderProperty();
- return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp),
- delivered.getLocalProperties(), context.getEquivalenceClassMap(op), context.getFDList(op));
+ return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp), delivered.getLocalProperties(),
+ context.getEquivalenceClassMap(op), context.getFDList(op));
}
private void addEnforcers(AbstractLogicalOperator op, int childIndex,
@@ -455,8 +456,8 @@
private void addLocalEnforcers(AbstractLogicalOperator op, int i, List<ILocalStructuralProperty> localProperties,
boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
if (AlgebricksConfig.DEBUG) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Adding local enforcers for local props = " + localProperties
- + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .fine(">>>> Adding local enforcers for local props = " + localProperties + "\n");
}
if (localProperties == null || localProperties.isEmpty()) {
@@ -475,8 +476,8 @@
}
case LOCAL_GROUPING_PROPERTY: {
LocalGroupingProperty g = (LocalGroupingProperty) prop;
- Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null) ? g
- .getPreferredOrderEnforcer() : g.getColumnSet();
+ Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null)
+ ? g.getPreferredOrderEnforcer() : g.getColumnSet();
List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
for (LogicalVariable v : vars) {
OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
@@ -502,7 +503,7 @@
private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
for (LocalOrderProperty orderProperty : oList) {
for (OrderColumn oc : orderProperty.getOrderColumns()) {
@@ -539,8 +540,8 @@
pop = new RandomMergeExchangePOperator();
} else {
if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
- IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get(
- OperatorAnnotations.USE_RANGE_CONNECTOR);
+ IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
+ .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
} else {
OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
@@ -574,7 +575,8 @@
break;
}
case ORDERED_PARTITIONED: {
- pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null);
+ pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain,
+ null);
break;
}
case BROADCAST: {
@@ -600,8 +602,8 @@
OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(exchg, context);
context.computeAndSetTypeEnvironmentForOperator(exchg);
if (AlgebricksConfig.DEBUG) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added partitioning enforcer "
- + exchg.getPhysicalOperator() + ".\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .fine(">>>> Added partitioning enforcer " + exchg.getPhysicalOperator() + ".\n");
printOp((AbstractLogicalOperator) op);
}
}
@@ -649,8 +651,8 @@
newOp.recomputeSchema();
newOp.computeDeliveredPhysicalProperties(context);
context.computeAndSetTypeEnvironmentForOperator(newOp);
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator()
- + ": " + newOp.getDeliveredPhysicalProperties() + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator() + ": "
+ + newOp.getDeliveredPhysicalProperties() + "\n");
PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(newOp, context);
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 92a3691..f13187f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -28,7 +28,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -38,7 +37,6 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,7 +60,8 @@
private int lastUsedClusterId = 0;
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
&& op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
@@ -153,16 +152,17 @@
candidate = group.get(0);
ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
rop.setPhysicalOperator(new ReplicatePOperator());
- rop.setExecutionMode(ExecutionMode.PARTITIONED);
Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
+ rop.setExecutionMode(((AbstractLogicalOperator) candidate.getValue()).getExecutionMode());
if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
rop.getInputs().add(candidate);
} else {
AbstractLogicalOperator beforeExchange = new ExchangeOperator();
beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ beforeExchange.setExecutionMode(rop.getExecutionMode());
Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
beforeExchange.getInputs().add(candidate);
context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
@@ -179,6 +179,7 @@
} else {
AbstractLogicalOperator exchange = new ExchangeOperator();
exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchange.setExecutionMode(rop.getExecutionMode());
MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
exchange.getInputs().add(ropRef);
rop.getOutputs().add(exchangeRef);
@@ -203,11 +204,14 @@
}
AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
+ assignOperator.setExecutionMode(rop.getExecutionMode());
assignOperator.setPhysicalOperator(new AssignPOperator());
AbstractLogicalOperator projectOperator = new ProjectOperator(liveVars);
projectOperator.setPhysicalOperator(new StreamProjectPOperator());
+ projectOperator.setExecutionMode(rop.getExecutionMode());
AbstractLogicalOperator exchOp = new ExchangeOperator();
exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchOp.setExecutionMode(rop.getExecutionMode());
exchOp.getInputs().add(ropRef);
MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
rop.getOutputs().add(exchOpRef);
@@ -239,6 +243,7 @@
} else {
AbstractLogicalOperator exchg = new ExchangeOperator();
exchg.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchg.setExecutionMode(childOp.getExecutionMode());
exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
context.computeAndSetTypeEnvironmentForOperator(exchg);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index a90d347..1cefead 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -26,7 +26,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -45,7 +44,7 @@
/**
* Pushes projections through its input operator, provided that operator does
* not produce the projected variables.
- *
+ *
* @author Nicola
*/
public class PushProjectDownRule implements IAlgebraicRewriteRule {
@@ -56,7 +55,8 @@
}
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
return false;
@@ -79,7 +79,7 @@
private static Pair<Boolean, Boolean> pushThroughOp(HashSet<LogicalVariable> toPush,
Mutable<ILogicalOperator> opRef2, ILogicalOperator initialOp, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
List<LogicalVariable> initProjectList = new ArrayList<LogicalVariable>(toPush);
AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
do {
@@ -111,7 +111,7 @@
boolean canCommuteProjection = initProjectList.containsAll(toPush) && initProjectList.containsAll(produced2)
&& initProjectList.containsAll(used2);
- // if true, we can get rid of the initial projection
+ // if true, we can get rid of the initial projection
// get rid of useless decor vars.
if (!canCommuteProjection && op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
@@ -191,7 +191,7 @@
// It does not try to push above another Projection.
private static boolean pushAllProjectionsOnTopOf(Collection<LogicalVariable> toPush,
Mutable<ILogicalOperator> opRef, IOptimizationContext context, ILogicalOperator initialOp)
- throws AlgebricksException {
+ throws AlgebricksException {
if (toPush.isEmpty()) {
return false;
}
@@ -201,15 +201,8 @@
return false;
}
- switch (op.getOperatorTag()) {
- case EXCHANGE: {
- opRef = opRef.getValue().getInputs().get(0);
- op = (AbstractLogicalOperator) opRef.getValue();
- break;
- }
- case PROJECT: {
- return false;
- }
+ if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ return false;
}
ProjectOperator pi2 = new ProjectOperator(new ArrayList<LogicalVariable>(toPush));
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
index daf27ac..93af981 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
@@ -21,7 +21,6 @@
import java.util.Iterator;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -37,7 +36,8 @@
public class IntroduceLeftOuterJoinForSubplanRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
@@ -74,6 +74,7 @@
InnerJoinOperator join = (InnerJoinOperator) op1;
Mutable<ILogicalOperator> leftRef = join.getInputs().get(0);
Mutable<ILogicalOperator> rightRef = join.getInputs().get(1);
+
Mutable<ILogicalOperator> ntsRef = getNtsAtEndOfPipeline(leftRef);
if (ntsRef == null) {
ntsRef = getNtsAtEndOfPipeline(rightRef);
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
index 1cda8a2..e034af0 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -25,29 +25,21 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class RandomPartitionComputerFactory implements
- ITuplePartitionComputerFactory {
+public class RandomPartitionComputerFactory implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final int domainCardinality;
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
- public RandomPartitionComputerFactory(int domainCardinality) {
- this.domainCardinality = domainCardinality;
- }
+ private final Random random = new Random();
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
-
- private final Random random = new Random();
-
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex,
- int nParts) throws HyracksDataException {
- return random.nextInt(domainCardinality);
- }
- };
- }
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ return random.nextInt(nParts);
+ }
+ };
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 7a3a019..4d73fa5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -52,10 +52,13 @@
return new IFrameWriter() {
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- buffer.mark();
+ // Record the current position, instead of using buffer.mark().
+ // The latter will be problematic because epWriters[i].nextFrame(buffer)
+ // can flip or clear the buffer.
+ int pos = buffer.position();
for (int i = 0; i < epWriters.length; ++i) {
if (i != 0) {
- buffer.reset();
+ buffer.position(pos);
}
epWriters[i].nextFrame(buffer);
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index f8a8a67..38a1ebc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,11 +37,17 @@
public class MaterializerTaskState extends AbstractStateObject {
private RunFileWriter out;
+ private final AtomicInteger numConsumers = new AtomicInteger(1);
public MaterializerTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
+ public MaterializerTaskState(JobId jobId, TaskId taskId, int numConsumers) {
+ super(jobId, taskId);
+ this.numConsumers.set(numConsumers);
+ }
+
@Override
public void toBytes(DataOutput out) throws IOException {
@@ -67,7 +74,7 @@
}
public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
- RunFileReader in = out.createDeleteOnCloseReader();
+ RunFileReader in = out.createReader();
try {
writer.open();
try {
@@ -75,6 +82,8 @@
while (in.nextFrame(frame)) {
writer.nextFrame(frame.getBuffer());
}
+ } catch (Throwable th) {
+ throw new HyracksDataException(th);
} finally {
in.close();
}
@@ -83,10 +92,9 @@
throw new HyracksDataException(th);
} finally {
writer.close();
+ if (numConsumers.decrementAndGet() == 0) {
+ out.getFileReference().delete();
+ }
}
}
-
- public void deleteFile() {
- out.getFileReference().delete();
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index feff13c..82c62a5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -43,10 +43,10 @@
private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
- private boolean[] outputMaterializationFlags;
- private boolean requiresMaterialization;
- private int numberOfNonMaterializedOutputs = 0;
- private int numberOfActiveMaterializeReaders = 0;
+ private final boolean[] outputMaterializationFlags;
+ private final boolean requiresMaterialization;
+ private final int numberOfNonMaterializedOutputs;
+ private final int numberOfMaterializedOutputs;
public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
this(spec, rDesc, outputArity, new boolean[outputArity]);
@@ -59,14 +59,23 @@
recordDescriptors[i] = rDesc;
}
this.outputMaterializationFlags = outputMaterializationFlags;
- requiresMaterialization = false;
+
+ boolean reqMaterialization = false;
+ int matOutputs = 0;
+ int nonMatOutputs = 0;
for (boolean flag : outputMaterializationFlags) {
if (flag) {
- requiresMaterialization = true;
- break;
+ reqMaterialization = true;
+ matOutputs++;
+ } else {
+ nonMatOutputs++;
}
}
+ this.requiresMaterialization = reqMaterialization;
+ this.numberOfMaterializedOutputs = matOutputs;
+ this.numberOfNonMaterializedOutputs = nonMatOutputs;
+
}
@Override
@@ -75,27 +84,17 @@
new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
- int taskOutputIndex = 0;
+ int pipelineOutputIndex = 0;
+ int activityId = MATERIALIZE_READER_ACTIVITY_ID;
for (int i = 0; i < outputArity; i++) {
- if (!outputMaterializationFlags[i]) {
- builder.addTargetEdge(i, sma, taskOutputIndex);
- taskOutputIndex++;
- }
- }
- numberOfNonMaterializedOutputs = taskOutputIndex;
-
- if (requiresMaterialization) {
- int activityId = MATERIALIZE_READER_ACTIVITY_ID;
- for (int i = 0; i < outputArity; i++) {
- if (outputMaterializationFlags[i]) {
- MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
- new ActivityId(odId, activityId));
- builder.addActivity(this, mra);
- builder.addTargetEdge(i, mra, 0);
- builder.addBlockingEdge(sma, mra);
- numberOfActiveMaterializeReaders++;
- activityId++;
- }
+ if (outputMaterializationFlags[i]) {
+ MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+ new ActivityId(odId, activityId++));
+ builder.addActivity(this, mra);
+ builder.addBlockingEdge(sma, mra);
+ builder.addTargetEdge(i, mra, 0);
+ } else {
+ builder.addTargetEdge(i, sma, pipelineOutputIndex++);
}
}
}
@@ -119,7 +118,7 @@
public void open() throws HyracksDataException {
if (requiresMaterialization) {
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
+ new TaskId(getActivityId(), partition), numberOfMaterializedOutputs);
state.open(ctx);
}
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
@@ -215,15 +214,6 @@
state.writeOut(writer, new VSizeFrame(ctx));
}
- @Override
- public void deinitialize() throws HyracksDataException {
- numberOfActiveMaterializeReaders--;
- MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
- new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
- if (numberOfActiveMaterializeReaders == 0) {
- state.deleteFile();
- }
- }
};
}
}