[NO ISSUE][COMP] Refactor physical operator assignment rules
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Refactor SetAlgebricksPhysicalOperatorsRule and make it extensible
- Make SetAsterixPhysicalOperatorsRule a subclass of
SetAlgebricksPhysicalOperatorsRule
- Remove SetAlgebricksPhysicalOperatorsRule from Asterix rule set,
replace its invocations with SetAsterixPhysicalOperatorsRule
Change-Id: I502f367464a6fabc595cff804722f793e052570f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3367
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 8677d0f..6a11abf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -132,7 +132,6 @@
import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
-import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
@@ -359,7 +358,6 @@
physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
//Turned off the following rule for now not to change OptimizerTest results.
physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
- physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
@@ -373,7 +371,7 @@
physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
// After adding projects, we may need need to set physical operators again.
- physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
+ physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
return physicalRewritesAllLevels;
}
@@ -390,7 +388,7 @@
// remove assigns that could become unused after PushLimitIntoPrimarySearchRule
physicalRewritesTopLevel.add(new RemoveUnusedAssignAndAggregateRule());
physicalRewritesTopLevel.add(new IntroduceProjectsRule());
- physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
+ physicalRewritesTopLevel.add(new SetAsterixPhysicalOperatorsRule());
physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
physicalRewritesTopLevel.add(new SetExecutionModeRule());
physicalRewritesTopLevel.add(new IntroduceRandomPartitioningFeedComputationRule());
@@ -400,7 +398,7 @@
public static final List<IAlgebraicRewriteRule> prepareForJobGenRuleCollection() {
List<IAlgebraicRewriteRule> prepareForJobGenRewrites = new LinkedList<>();
prepareForJobGenRewrites.add(new InsertProjectBeforeUnionRule());
- prepareForJobGenRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
+ prepareForJobGenRewrites.add(new SetAsterixPhysicalOperatorsRule());
prepareForJobGenRewrites
.add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 4314b3a..b26eaca 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -37,349 +37,223 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
-public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
+public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysicalOperatorsRule {
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- return false;
+ protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
+ IOptimizationContext context) {
+ return new AsterixPhysicalOperatorFactoryVisitor(context);
}
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (context.checkIfInDontApplySet(this, op)) {
- return false;
+ private static class AsterixPhysicalOperatorFactoryVisitor extends AlgebricksPhysicalOperatorFactoryVisitor {
+
+ private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
+ super(context);
}
- computeDefaultPhysicalOp(op, true, context);
- context.addToDontApplySet(this, op);
- return true;
- }
+ @Override
+ public ExternalGroupByPOperator createExternalGroupByPOperator(GroupByOperator gby) throws AlgebricksException {
+ Mutable<ILogicalOperator> r0 = gby.getNestedPlans().get(0).getRoots().get(0);
+ if (!r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
+ return null;
+ }
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ boolean serializable = aggOp.getExpressions().stream()
+ .allMatch(exprRef -> exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
+ && BuiltinFunctions.isAggregateFunctionSerializable(
+ ((AbstractFunctionCallExpression) exprRef.getValue()).getFunctionIdentifier()));
+ if (!serializable) {
+ return null;
+ }
- private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context)
- throws AlgebricksException {
- for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context);
+ // if serializable, use external group-by
+ // now check whether the serialized version aggregation function has corresponding intermediate agg
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ List<LogicalVariable> originalVariables = aggOp.getVariables();
+ List<Mutable<ILogicalExpression>> aggExprs = aggOp.getExpressions();
+ int aggNum = aggExprs.size();
+ for (int i = 0; i < aggNum; i++) {
+ AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+ AggregateFunctionCallExpression serialAggExpr = BuiltinFunctions
+ .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(), expr.getArguments());
+ serialAggExpr.setSourceLocation(expr.getSourceLocation());
+ if (mergeAggregationExpressionFactory.createMergeAggregation(originalVariables.get(i), serialAggExpr,
+ context) == null) {
+ return null;
+ }
+ }
+
+ // Check whether there are multiple aggregates in the sub plan.
+ // Currently, we don't support multiple aggregates in one external group-by.
+ ILogicalOperator r1Logical = aggOp;
+ while (r1Logical.hasInputs()) {
+ r1Logical = r1Logical.getInputs().get(0).getValue();
+ if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ return null;
+ }
+ }
+
+ for (int i = 0; i < aggNum; i++) {
+ AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+ AggregateFunctionCallExpression serialAggExpr = BuiltinFunctions
+ .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(), expr.getArguments());
+ serialAggExpr.setSourceLocation(expr.getSourceLocation());
+ aggOp.getExpressions().get(i).setValue(serialAggExpr);
+ }
+
+ generateMergeAggregationExpressions(gby);
+
+ return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
}
- }
- private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
- IOptimizationContext context) throws AlgebricksException {
- PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
- if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
- GroupByOperator gby = (GroupByOperator) op;
- if (gby.getNestedPlans().size() == 1) {
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() == 1) {
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- if (r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- boolean serializable = true;
- for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
- AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
- if (!BuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
- serializable = false;
- break;
- }
- }
+ private void generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
+ if (gby.getNestedPlans().size() != 1) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ ILogicalPlan p0 = gby.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+ AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+ if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+ "The merge aggregation expression generation should not process a " + r0Logical.getOperatorTag()
+ + " operator.");
+ }
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+ List<LogicalVariable> aggProducedVars = aggOp.getVariables();
+ int n = aggOp.getExpressions().size();
+ List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
+ ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+ .createMergeAggregation(aggProducedVars.get(i), aggFuncExpr, context);
+ if (mergeExpr == null) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, aggFuncExpr.getSourceLocation(),
+ "The aggregation function "
+ + ((AbstractFunctionCallExpression) aggFuncExpr).getFunctionIdentifier().getName()
+ + " does not have a registered intermediate aggregation function.");
+ }
+ mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+ }
+ aggOp.setMergeExpressions(mergeExpressionRefs);
+ }
- if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
- .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
- boolean setToExternalGby = false;
- if (serializable) {
- // if serializable, use external group-by
- // now check whether the serialized version aggregation function has corresponding intermediate agg
- boolean hasIntermediateAgg = true;
- IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
- context.getMergeAggregationExpressionFactory();
- List<LogicalVariable> originalVariables = aggOp.getVariables();
- List<Mutable<ILogicalExpression>> aggExprs = aggOp.getExpressions();
- int aggNum = aggExprs.size();
- for (int i = 0; i < aggNum; i++) {
- AbstractFunctionCallExpression expr =
- (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
- AggregateFunctionCallExpression serialAggExpr =
- BuiltinFunctions.makeSerializableAggregateFunctionExpression(
- expr.getFunctionIdentifier(), expr.getArguments());
- serialAggExpr.setSourceLocation(expr.getSourceLocation());
- if (mergeAggregationExpressionFactory.createMergeAggregation(
- originalVariables.get(i), serialAggExpr, context) == null) {
- hasIntermediateAgg = false;
- break;
- }
- }
+ @Override
+ public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op);
+ }
- // Check whether there are multiple aggregates in the sub plan.
- // Currently, we don't support multiple aggregates in one external group-by.
- boolean multipleAggOpsFound = false;
- ILogicalOperator r1Logical = aggOp;
- while (r1Logical.hasInputs()) {
- r1Logical = r1Logical.getInputs().get(0).getValue();
- if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
- multipleAggOpsFound = true;
- break;
- }
- }
+ @Override
+ public IPhysicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op);
+ }
- if (hasIntermediateAgg && !multipleAggOpsFound) {
- for (int i = 0; i < aggNum; i++) {
- AbstractFunctionCallExpression expr =
- (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
- AggregateFunctionCallExpression serialAggExpr =
- BuiltinFunctions.makeSerializableAggregateFunctionExpression(
- expr.getFunctionIdentifier(), expr.getArguments());
- serialAggExpr.setSourceLocation(expr.getSourceLocation());
- aggOp.getExpressions().get(i).setValue(serialAggExpr);
- }
- ExternalGroupByPOperator externalGby =
- new ExternalGroupByPOperator(gby.getGroupByVarList(),
- physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
- generateMergeAggregationExpressions(gby, context);
- op.setPhysicalOperator(externalGby);
- setToExternalGby = true;
- }
- }
-
- if (!setToExternalGby) {
- // if not serializable or no intermediate agg, use pre-clustered group-by
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- columnList.add(varRef.getVariableReference());
- }
- }
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- }
- }
- } else if (r0.getValue().getOperatorTag().equals(LogicalOperatorTag.RUNNINGAGGREGATE)) {
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- columnList.add(varRef.getVariableReference());
- }
- }
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- } else {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
- "Unsupported nested operator within a group-by: "
- + r0.getValue().getOperatorTag().name());
- }
+ private IPhysicalOperator visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op)
+ throws AlgebricksException {
+ ILogicalExpression unnestExpr = op.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation());
+ }
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+ if (!f.getFunctionIdentifier().equals(BuiltinFunctions.INDEX_SEARCH)) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation());
+ }
+ AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+ jobGenParams.readFromFuncArgs(f.getArguments());
+ MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+ DataSourceId dataSourceId =
+ new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ Dataset dataset = mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ IDataSourceIndex<String, DataSourceId> dsi =
+ mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
+ INodeDomain storageDomain = mp.findNodeDomain(dataset.getNodeGroupName());
+ if (dsi == null) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
+ "Could not find index " + jobGenParams.getIndexName() + " for dataset " + dataSourceId);
+ }
+ IndexType indexType = jobGenParams.getIndexType();
+ boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
+ switch (indexType) {
+ case BTREE: {
+ BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+ btreeJobGenParams.readFromFuncArgs(f.getArguments());
+ return new BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
+ btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
+ btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList());
+ }
+ case RTREE: {
+ return new RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast);
+ }
+ case SINGLE_PARTITION_WORD_INVIX:
+ case SINGLE_PARTITION_NGRAM_INVIX: {
+ return new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, false);
+ }
+ case LENGTH_PARTITIONED_WORD_INVIX:
+ case LENGTH_PARTITIONED_NGRAM_INVIX: {
+ return new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, true);
+ }
+ default: {
+ throw AlgebricksException.create(
+ org.apache.hyracks.api.exceptions.ErrorCode.OPERATOR_NOT_IMPLEMENTED,
+ op.getSourceLocation(), op.getOperatorTag().toString() + " with " + indexType + " index");
}
}
}
- if (op.getPhysicalOperator() == null) {
- switch (op.getOperatorTag()) {
- case INNERJOIN: {
- JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context);
- break;
- }
- case LEFTOUTERJOIN: {
- JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context);
- break;
- }
- case UNNEST_MAP:
- case LEFT_OUTER_UNNEST_MAP: {
- ILogicalExpression unnestExpr = null;
- unnestExpr = ((AbstractUnnestMapOperator) op).getExpressionRef().getValue();
- if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
- AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
- FunctionIdentifier fid = f.getFunctionIdentifier();
- if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) {
- throw new IllegalStateException();
- }
- AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
- jobGenParams.readFromFuncArgs(f.getArguments());
- MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
- DataSourceId dataSourceId =
- new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
- Dataset dataset =
- mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
- IDataSourceIndex<String, DataSourceId> dsi =
- mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
- INodeDomain storageDomain = mp.findNodeDomain(dataset.getNodeGroupName());
- if (dsi == null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
- "Could not find index " + jobGenParams.getIndexName() + " for dataset "
- + dataSourceId);
- }
- IndexType indexType = jobGenParams.getIndexType();
- boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
- switch (indexType) {
- case BTREE: {
- BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
- btreeJobGenParams.readFromFuncArgs(f.getArguments());
- op.setPhysicalOperator(new BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
- btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
- btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList()));
- break;
- }
- case RTREE: {
- op.setPhysicalOperator(new RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast));
- break;
- }
- case SINGLE_PARTITION_WORD_INVIX:
- case SINGLE_PARTITION_NGRAM_INVIX: {
- op.setPhysicalOperator(
- new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, false));
- break;
- }
- case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- op.setPhysicalOperator(
- new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, true));
- break;
- }
- default: {
- throw new NotImplementedException(indexType + " indexes are not implemented.");
- }
- }
- }
- break;
- }
- case WINDOW: {
- WindowOperator winOp = (WindowOperator) op;
- WindowPOperator physOp = createWindowPOperator(winOp, context);
- op.setPhysicalOperator(physOp);
- break;
- }
- }
- }
- if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
- for (ILogicalPlan p : nested.getNestedPlans()) {
- setPhysicalOperators(p, false, context);
- }
- }
- for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
+
+ @Override
+ public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
+ boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
+ BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
+ boolean frameStartIsMonotonic = AnalysisUtil
+ .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
+ boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
+ winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates = winOp.hasNestedPlans()
+ && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+
+ return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization,
+ winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
}
}
-
- private static void generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
- throws AlgebricksException {
- if (gby.getNestedPlans().size() != 1) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
- "External group-by currently works only for one nested plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() != 1) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
- "External group-by currently works only for one nested plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
- context.getMergeAggregationExpressionFactory();
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
- if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
- "The merge aggregation expression generation should not process a " + r0Logical.getOperatorTag()
- + " operator.");
- }
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
- List<LogicalVariable> aggProducedVars = aggOp.getVariables();
- int n = aggOp.getExpressions().size();
- List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
- for (int i = 0; i < n; i++) {
- ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
- .createMergeAggregation(aggProducedVars.get(i), aggFuncExpr, context);
- if (mergeExpr == null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, aggFuncExpr.getSourceLocation(),
- "The aggregation function "
- + ((AbstractFunctionCallExpression) aggFuncExpr).getFunctionIdentifier().getName()
- + " does not have a registered intermediate aggregation function.");
- }
- mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
- }
- aggOp.setMergeExpressions(mergeExpressionRefs);
- }
-
- private static WindowPOperator createWindowPOperator(WindowOperator winOp, IOptimizationContext context)
- throws CompilationException {
- List<Mutable<ILogicalExpression>> partitionExprs = winOp.getPartitionExpressions();
- List<LogicalVariable> partitionColumns = new ArrayList<>(partitionExprs.size());
- for (Mutable<ILogicalExpression> pe : partitionExprs) {
- ILogicalExpression partExpr = pe.getValue();
- if (partExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
- "Window partition/order expression has not been normalized");
- }
- LogicalVariable var = ((VariableReferenceExpression) partExpr).getVariableReference();
- partitionColumns.add(var);
- }
- List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprs = winOp.getOrderExpressions();
- List<OrderColumn> orderColumns = new ArrayList<>(orderExprs.size());
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprs) {
- ILogicalExpression orderExpr = p.second.getValue();
- if (orderExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
- "Window partition/order expression has not been normalized");
- }
- LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference();
- orderColumns.add(new OrderColumn(var, p.first.getKind()));
- }
-
- boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
- BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
- boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
- winOp.getFrameValueExpressions());
- boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
- winOp.getFrameValueExpressions());
- boolean nestedTrivialAggregates = winOp.hasNestedPlans()
- && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
- int memSizeInFrames = context.getPhysicalOptimizationConfig().getMaxFramesForWindow();
-
- return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic,
- frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames);
- }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 0235dad..db2290c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -29,9 +29,12 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -375,4 +378,28 @@
public boolean requiresVariableReferenceExpressions() {
return false;
}
+
+ public List<LogicalVariable> getPartitionVarList() {
+ List<LogicalVariable> varList = new ArrayList<>(partitionExpressions.size());
+ for (Mutable<ILogicalExpression> pe : partitionExpressions) {
+ ILogicalExpression partExpr = pe.getValue();
+ if (partExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ LogicalVariable var = ((VariableReferenceExpression) partExpr).getVariableReference();
+ varList.add(var);
+ }
+ }
+ return varList;
+ }
+
+ public List<OrderColumn> getOrderColumnList() {
+ List<OrderColumn> orderColumns = new ArrayList<>(orderExpressions.size());
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+ ILogicalExpression orderExpr = p.second.getValue();
+ if (orderExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference();
+ orderColumns.add(new OrderColumn(var, p.first.getKind()));
+ }
+ }
+ return orderColumns;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 1d5a7e9..49e5a0b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -18,10 +18,10 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
-import static org.apache.hyracks.api.exceptions.ErrorCode.ORDER_EXPR_NOT_NORMALIZED;
-
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -32,6 +32,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -42,17 +43,41 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
@@ -88,10 +113,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule {
@@ -105,385 +133,476 @@
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- // if (context.checkIfInDontApplySet(this, op)) {
- // return false;
- // }
if (op.getPhysicalOperator() != null) {
return false;
}
-
- computeDefaultPhysicalOp(op, true, context);
- // context.addToDontApplySet(this, op);
+ computeDefaultPhysicalOp(op, true, createPhysicalOperatorFactoryVisitor(context));
return true;
}
- private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context)
- throws AlgebricksException {
- for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context);
- }
- }
-
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
- IOptimizationContext context) throws AlgebricksException {
- PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+ ILogicalOperatorVisitor<IPhysicalOperator, Boolean> physOpFactory) throws AlgebricksException {
if (op.getPhysicalOperator() == null) {
- switch (op.getOperatorTag()) {
- case AGGREGATE: {
- op.setPhysicalOperator(new AggregatePOperator());
- break;
- }
- case ASSIGN: {
- op.setPhysicalOperator(new AssignPOperator());
- break;
- }
- case DISTINCT: {
- DistinctOperator distinct = (DistinctOperator) op;
- if (topLevelOp) {
- distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
- } else {
- distinct.setPhysicalOperator(
- new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
- }
- break;
- }
- case EMPTYTUPLESOURCE: {
- op.setPhysicalOperator(new EmptyTupleSourcePOperator());
- break;
- }
- case EXCHANGE: {
- if (op.getPhysicalOperator() == null) {
- throw new AlgebricksException("Implementation for EXCHANGE operator was not set.");
- }
- // implem. choice for exchange should be set by a parent op.
- break;
- }
- case GROUP: {
- GroupByOperator gby = (GroupByOperator) op;
-
- if (gby.getNestedPlans().size() == 1) {
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() == 1) {
- if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
- || (gby.getAnnotations()
- .get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
- if (!topLevelOp) {
- throw new NotImplementedException(
- "External hash group-by for nested grouping is not implemented.");
- }
-
- boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
- if (hasIntermediateAgg) {
- ExternalGroupByPOperator externalGby =
- new ExternalGroupByPOperator(gby.getGroupByVarList(),
- physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
- op.setPhysicalOperator(externalGby);
- break;
- }
- }
- }
- }
-
- if (topLevelOp) {
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(gby.getGroupByVarList(),
- gby.isGroupAll(), context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- } else {
- op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- }
- break;
- }
- case INNERJOIN: {
- JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context);
- break;
- }
- case LEFTOUTERJOIN: {
- JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context);
- break;
- }
- case LIMIT: {
- op.setPhysicalOperator(new StreamLimitPOperator());
- break;
- }
- case NESTEDTUPLESOURCE: {
- op.setPhysicalOperator(new NestedTupleSourcePOperator());
- break;
- }
- case ORDER: {
- OrderOperator oo = (OrderOperator) op;
- for (Pair<IOrder, Mutable<ILogicalExpression>> p : oo.getOrderExpressions()) {
- ILogicalExpression e = p.second.getValue();
- if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw AlgebricksException.create(ORDER_EXPR_NOT_NORMALIZED, e.getSourceLocation());
- }
- }
- if (topLevelOp) {
- op.setPhysicalOperator(new StableSortPOperator(
- physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
- } else {
- op.setPhysicalOperator(new InMemoryStableSortPOperator());
- }
- break;
- }
- case PROJECT: {
- op.setPhysicalOperator(new StreamProjectPOperator());
- break;
- }
- case RUNNINGAGGREGATE: {
- op.setPhysicalOperator(new RunningAggregatePOperator());
- break;
- }
- case REPLICATE: {
- op.setPhysicalOperator(new ReplicatePOperator());
- break;
- }
- case SPLIT:
- op.setPhysicalOperator(new SplitPOperator());
- break;
- case SCRIPT: {
- op.setPhysicalOperator(new StringStreamingScriptPOperator());
- break;
- }
- case SELECT: {
- op.setPhysicalOperator(new StreamSelectPOperator());
- break;
- }
- case SUBPLAN: {
- op.setPhysicalOperator(new SubplanPOperator());
- break;
- }
- case UNIONALL: {
- if (topLevelOp) {
- op.setPhysicalOperator(new UnionAllPOperator());
- } else {
- op.setPhysicalOperator(new MicroUnionAllPOperator());
- }
- break;
- }
- case INTERSECT: {
- if (topLevelOp) {
- op.setPhysicalOperator(new IntersectPOperator());
- } else {
- throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
- }
- break;
- }
- case UNNEST: {
- op.setPhysicalOperator(new UnnestPOperator());
- break;
- }
- case LEFT_OUTER_UNNEST:
- op.setPhysicalOperator(new LeftOuterUnnestPOperator());
- break;
- case DATASOURCESCAN: {
- DataSourceScanOperator scan = (DataSourceScanOperator) op;
- IDataSource dataSource = scan.getDataSource();
- DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
- if (dataSource.isScanAccessPathALeaf()) {
- dss.disableJobGenBelowMe();
- }
- op.setPhysicalOperator(dss);
- break;
- }
- case WRITE: {
- op.setPhysicalOperator(new SinkWritePOperator());
- break;
- }
- case DISTRIBUTE_RESULT: {
- op.setPhysicalOperator(new DistributeResultPOperator());
- break;
- }
- case WRITE_RESULT: {
- WriteResultOperator opLoad = (WriteResultOperator) op;
- LogicalVariable payload;
- List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
- if (opLoad.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
- }
- op.setPhysicalOperator(
- new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
- break;
- }
- case INSERT_DELETE_UPSERT: {
- // Primary index
- InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
- LogicalVariable payload;
- List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- List<LogicalVariable> additionalNonFilterVariables = null;
- if (opLoad.getAdditionalNonFilteringExpressions() != null) {
- additionalNonFilterVariables = new ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
- }
- payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
- if (opLoad.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
- }
- if (opLoad.isBulkload()) {
- op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys,
- additionalNonFilterVariables, opLoad.getDataSource()));
- } else {
- op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
- opLoad.getDataSource(), opLoad.getOperation(), additionalNonFilterVariables));
- }
- break;
- }
- case INDEX_INSERT_DELETE_UPSERT: {
- // Secondary index
- IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
- List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
- List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
- getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
- if (opInsDel.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new ArrayList<LogicalVariable>();
- getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
- }
- if (opInsDel.isBulkload()) {
- op.setPhysicalOperator(
- new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
- opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
- } else {
- LogicalVariable upsertIndicatorVar = null;
- List<LogicalVariable> prevSecondaryKeys = null;
- LogicalVariable prevAdditionalFilteringKey = null;
- if (opInsDel.getOperation() == Kind.UPSERT) {
- upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
- prevSecondaryKeys = new ArrayList<LogicalVariable>();
- getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
- if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
- prevAdditionalFilteringKey =
- ((VariableReferenceExpression) (opInsDel.getPrevAdditionalFilteringExpression())
- .getValue()).getVariableReference();
- }
- }
- op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
- upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey,
- opInsDel.getNumberOfAdditionalNonFilteringFields()));
- }
- break;
-
- }
- case TOKENIZE: {
- TokenizeOperator opTokenize = (TokenizeOperator) op;
- List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
- List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
- getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
- getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
- // Tokenize Operator only operates with a bulk load on a data set with an index
- if (opTokenize.isBulkload()) {
- op.setPhysicalOperator(
- new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
- }
- break;
- }
- case SINK: {
- op.setPhysicalOperator(new SinkPOperator());
- break;
- }
- case FORWARD:
- op.setPhysicalOperator(new SortForwardPOperator());
- break;
+ IPhysicalOperator physOp = op.accept(physOpFactory, topLevelOp);
+ if (physOp == null) {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+ op.getOperatorTag());
}
+ op.setPhysicalOperator(physOp);
}
if (op.hasNestedPlans()) {
AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
- setPhysicalOperators(p, false, context);
+ for (Mutable<ILogicalOperator> root : p.getRoots()) {
+ computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), false, physOpFactory);
+ }
}
}
for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
+ computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, physOpFactory);
}
}
- private static void getKeys(List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
- for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
- keys.add(getKey(kExpr.getValue()));
- }
+ protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
+ IOptimizationContext context) {
+ return new AlgebricksPhysicalOperatorFactoryVisitor(context);
}
- private static LogicalVariable getKey(ILogicalExpression keyExpression) {
- if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException();
- }
- return ((VariableReferenceExpression) keyExpression).getVariableReference();
- }
+ protected static class AlgebricksPhysicalOperatorFactoryVisitor
+ implements ILogicalOperatorVisitor<IPhysicalOperator, Boolean> {
- private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
- List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
- LogicalVariable payload;
- if (payloadExpr.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException();
- }
- payload = ((VariableReferenceExpression) payloadExpr.getValue()).getVariableReference();
+ protected final IOptimizationContext context;
- for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
- ILogicalExpression e = kExpr.getValue();
- if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+
+ protected AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
+ this.context = context;
+ this.physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+ }
+
+ @Override
+ public IPhysicalOperator visitAggregateOperator(AggregateOperator op, Boolean topLevelOp) {
+ return new AggregatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) {
+ return new AssignPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitDistinctOperator(DistinctOperator distinct, Boolean topLevelOp) {
+ if (topLevelOp) {
+ return new PreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+ } else {
+ return new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean topLevelOp) {
+ return new EmptyTupleSourcePOperator();
+ }
+
+ @Override
+ public final IPhysicalOperator visitGroupByOperator(GroupByOperator gby, Boolean topLevelOp)
+ throws AlgebricksException {
+
+ ensureAllVariables(gby.getGroupByList(), Pair::getSecond);
+
+ if (gby.getNestedPlans().size() == 1 && gby.getNestedPlans().get(0).getRoots().size() == 1) {
+ if (topLevelOp && ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
+ || (gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE))) {
+ ExternalGroupByPOperator extGby = createExternalGroupByPOperator(gby);
+ if (extGby != null) {
+ return extGby;
+ }
+ }
+ }
+
+ if (topLevelOp) {
+ return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ } else {
+ return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ }
+ }
+
+ protected ExternalGroupByPOperator createExternalGroupByPOperator(GroupByOperator gby)
+ throws AlgebricksException {
+ boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby);
+ if (!hasIntermediateAgg) {
+ return null;
+ }
+ return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
+ }
+
+ @Override
+ public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+ return op.getPhysicalOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+ return op.getPhysicalOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitLimitOperator(LimitOperator op, Boolean topLevelOp) {
+ return new StreamLimitPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean topLevelOp) {
+ return new NestedTupleSourcePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean topLevelOp) throws AlgebricksException {
+ ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
+ if (topLevelOp) {
+ return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK());
+ } else {
+ return new InMemoryStableSortPOperator();
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitProjectOperator(ProjectOperator op, Boolean topLevelOp) {
+ return new StreamProjectPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Boolean topLevelOp) {
+ return new RunningAggregatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitReplicateOperator(ReplicateOperator op, Boolean topLevelOp) {
+ return new ReplicatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSplitOperator(SplitOperator op, Boolean topLevelOp) {
+ return new SplitPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitScriptOperator(ScriptOperator op, Boolean topLevelOp) {
+ return new StringStreamingScriptPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSelectOperator(SelectOperator op, Boolean topLevelOp) {
+ return new StreamSelectPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSubplanOperator(SubplanOperator op, Boolean topLevelOp) {
+ return new SubplanPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitUnionOperator(UnionAllOperator op, Boolean topLevelOp) {
+ if (topLevelOp) {
+ return new UnionAllPOperator();
+ } else {
+ return new MicroUnionAllPOperator();
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitIntersectOperator(IntersectOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ if (topLevelOp) {
+ return new IntersectPOperator();
+ } else {
+ throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+ op.getOperatorTag().toString() + " (micro)");
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitUnnestOperator(UnnestOperator op, Boolean topLevelOp) {
+ return new UnnestPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean topLevelOp) {
+ return new LeftOuterUnnestPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitDataScanOperator(DataSourceScanOperator scan, Boolean topLevelOp) {
+ IDataSource dataSource = scan.getDataSource();
+ DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
+ if (dataSource.isScanAccessPathALeaf()) {
+ dss.disableJobGenBelowMe();
+ }
+ return dss;
+ }
+
+ @Override
+ public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) {
+ return new SinkWritePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitDistributeResultOperator(DistributeResultOperator op, Boolean topLevelOp) {
+ return new DistributeResultPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitWriteResultOperator(WriteResultOperator opLoad, Boolean topLevelOp) {
+ List<LogicalVariable> keys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ LogicalVariable payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
+ if (opLoad.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+ }
+ return new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys);
+ }
+
+ @Override
+ public IPhysicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator opLoad,
+ Boolean topLevelOp) {
+ // Primary index
+ List<LogicalVariable> keys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ List<LogicalVariable> additionalNonFilterVariables = null;
+ if (opLoad.getAdditionalNonFilteringExpressions() != null) {
+ additionalNonFilterVariables = new ArrayList<>();
+ getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
+ }
+ LogicalVariable payload =
+ getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
+ if (opLoad.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+ }
+ if (opLoad.isBulkload()) {
+ return new BulkloadPOperator(payload, keys, additionalFilteringKeys, additionalNonFilterVariables,
+ opLoad.getDataSource());
+ } else {
+ return new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource(),
+ opLoad.getOperation(), additionalNonFilterVariables);
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator opInsDel,
+ Boolean topLevelOp) {
+ // Secondary index
+ List<LogicalVariable> primaryKeys = new ArrayList<>();
+ List<LogicalVariable> secondaryKeys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
+ getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
+ if (opInsDel.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+ }
+ if (opInsDel.isBulkload()) {
+ return new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex());
+ } else {
+ LogicalVariable upsertIndicatorVar = null;
+ List<LogicalVariable> prevSecondaryKeys = null;
+ LogicalVariable prevAdditionalFilteringKey = null;
+ if (opInsDel.getOperation() == Kind.UPSERT) {
+ upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+ prevSecondaryKeys = new ArrayList<>();
+ getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+ if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+ prevAdditionalFilteringKey =
+ ((VariableReferenceExpression) (opInsDel.getPrevAdditionalFilteringExpression())
+ .getValue()).getVariableReference();
+ }
+ }
+ return new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), upsertIndicatorVar,
+ prevSecondaryKeys, prevAdditionalFilteringKey,
+ opInsDel.getNumberOfAdditionalNonFilteringFields());
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitTokenizeOperator(TokenizeOperator opTokenize, Boolean topLevelOp)
+ throws AlgebricksException {
+ List<LogicalVariable> primaryKeys = new ArrayList<>();
+ List<LogicalVariable> secondaryKeys = new ArrayList<>();
+ getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
+ getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
+ // Tokenize Operator only operates with a bulk load on a data set with an index
+ if (!opTokenize.isBulkload()) {
+ throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, opTokenize.getSourceLocation(),
+ opTokenize.getOperatorTag().toString() + " (no bulkload)");
+ }
+ return new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex());
+ }
+
+ @Override
+ public IPhysicalOperator visitSinkOperator(SinkOperator op, Boolean topLevelOp) {
+ return new SinkPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitForwardOperator(ForwardOperator op, Boolean topLevelOp) {
+ return new SortForwardPOperator();
+ }
+
+ @Override
+ public final IPhysicalOperator visitWindowOperator(WindowOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ ensureAllVariables(op.getPartitionExpressions(), v -> v);
+ ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+ return createWindowPOperator(op);
+ }
+
+ protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
+ return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ }
+
+ // Physical operators for these operators must have been set already by rules that introduced them
+
+ @Override
+ public IPhysicalOperator visitDelegateOperator(DelegateOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ @Override
+ public IPhysicalOperator visitExchangeOperator(ExchangeOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ @Override
+ public IPhysicalOperator visitMaterializeOperator(MaterializeOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ // Physical operators for these operators cannot be instantiated by Algebricks
+
+ @Override
+ public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ @Override
+ public IPhysicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ // Helper methods
+
+ private static void getKeys(List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
+ for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+ keys.add(getKey(kExpr.getValue()));
+ }
+ }
+
+ private static LogicalVariable getKey(ILogicalExpression keyExpression) {
+ if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
throw new NotImplementedException();
}
- keys.add(((VariableReferenceExpression) e).getVariableReference());
- }
- return payload;
- }
-
- private static boolean generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
- throws AlgebricksException {
- if (gby.getNestedPlans().size() != 1) {
- //External/Sort group-by currently works only for one nested plan with one root containing
- //an aggregate and a nested-tuple-source.
- throw new AlgebricksException(
- "External group-by currently works only for one nested plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() != 1) {
- //External/Sort group-by currently works only for one nested plan with one root containing
- //an aggregate and a nested-tuple-source.
- throw new AlgebricksException(
- "External group-by currently works only for one nested plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
- context.getMergeAggregationExpressionFactory();
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
- if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- return false;
+ return ((VariableReferenceExpression) keyExpression).getVariableReference();
}
- // Check whether there are multiple aggregates in the sub plan.
- ILogicalOperator r1Logical = r0Logical;
- while (r1Logical.hasInputs()) {
- r1Logical = r1Logical.getInputs().get(0).getValue();
- if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
+ List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
+ LogicalVariable payload;
+ if (payloadExpr.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException();
+ }
+ payload = ((VariableReferenceExpression) payloadExpr.getValue()).getVariableReference();
+
+ for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+ ILogicalExpression e = kExpr.getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException();
+ }
+ keys.add(((VariableReferenceExpression) e).getVariableReference());
+ }
+ return payload;
+ }
+
+ private boolean generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
+ if (gby.getNestedPlans().size() != 1) {
+ //External/Sort group-by currently works only for one nested plan with one root containing
+ //an aggregate and a nested-tuple-source.
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ ILogicalPlan p0 = gby.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ //External/Sort group-by currently works only for one nested plan with one root containing
+ //an aggregate and a nested-tuple-source.
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+ AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+ if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
}
+
+ // Check whether there are multiple aggregates in the sub plan.
+ ILogicalOperator r1Logical = r0Logical;
+ while (r1Logical.hasInputs()) {
+ r1Logical = r1Logical.getInputs().get(0).getValue();
+ if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ }
+
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+ List<LogicalVariable> originalAggVars = aggOp.getVariables();
+ int n = aggOp.getExpressions().size();
+ List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+ .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+ if (mergeExpr == null) {
+ return false;
+ }
+ mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+ }
+ aggOp.setMergeExpressions(mergeExpressionRefs);
+ return true;
}
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
- List<LogicalVariable> originalAggVars = aggOp.getVariables();
- int n = aggOp.getExpressions().size();
- List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
- for (int i = 0; i < n; i++) {
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
- .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
- if (mergeExpr == null) {
- return false;
+ static <E> void ensureAllVariables(Collection<E> exprList, Function<E, Mutable<ILogicalExpression>> accessor)
+ throws AlgebricksException {
+ for (E item : exprList) {
+ ILogicalExpression e = accessor.apply(item).getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED, e.getSourceLocation());
+ }
}
- mergeExpressionRefs.add(new MutableObject<ILogicalExpression>(mergeExpr));
}
- aggOp.setMergeExpressions(mergeExpressionRefs);
- return true;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index bf34664..a31aef2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -161,7 +161,8 @@
public static final int CANNOT_COMPOSE_PART_CONSTRAINTS = 10001;
public static final int PHYS_OPERATOR_NOT_SET = 10002;
public static final int DESCRIPTOR_GENERATION_ERROR = 10003;
- public static final int ORDER_EXPR_NOT_NORMALIZED = 10004;
+ public static final int EXPR_NOT_NORMALIZED = 10004;
+ public static final int OPERATOR_NOT_IMPLEMENTED = 10005;
private static class Holder {
private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b4f7973..8e3b85e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,4 +143,5 @@
10001 = Cannot compose partition constraint %1$s with %2$s
10002 = Physical operator not set for operator: %1$s
10003 = Could not generate operator descriptor for operator %1$s
-10004 = Order expression has not been normalized
+10004 = Expression has not been normalized
+10005 = Operator is not implemented: %1$s