[NO ISSUE][COMP] Support batch assign for external functions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add compiler support for batch invocation of external functions
Change-Id: I1ed1f5c51628d996327de843f4977d083e9b4bd4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10006
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java
new file mode 100644
index 0000000..0ec5ee7
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.library.ExternalFunctionDescriptorProvider;
+import org.apache.asterix.external.operators.ExternalAssignBatchRuntimeFactory;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class AssignBatchPOperator extends AbstractAssignPOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.ASSIGN_BATCH;
+ }
+
+ @Override
+ protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
+ throws AlgebricksException {
+ IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+ List<Mutable<ILogicalExpression>> exprList = op.getExpressions();
+ int exprCount = exprList.size();
+ IExternalFunctionDescriptor[] fnDescs = new IExternalFunctionDescriptor[exprCount];
+ int[][] fnArgColumns = new int[exprCount][];
+ for (int i = 0; i < exprCount; i++) {
+ Mutable<ILogicalExpression> exprRef = exprList.get(i);
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+ String.valueOf(expr.getExpressionTag()));
+ }
+ AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+ IFunctionInfo fi = callExpr.getFunctionInfo();
+ if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), fi)) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+ fi.toString());
+ }
+ fnDescs[i] = ExternalFunctionDescriptorProvider.resolveExternalFunction(callExpr, inputTypeEnv, context);
+ fnArgColumns[i] = getColumns(callExpr.getArguments(), opSchema, op.getSourceLocation());
+ }
+
+ return new ExternalAssignBatchRuntimeFactory(outColumns, fnDescs, fnArgColumns, projectionList);
+ }
+
+ private int[] getColumns(List<Mutable<ILogicalExpression>> exprList, IOperatorSchema opSchema,
+ SourceLocation sourceLoc) throws CompilationException {
+ int n = exprList.size();
+ int[] columns = new int[n];
+ for (int i = 0; i < n; i++) {
+ ILogicalExpression expr = exprList.get(i).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
+ String.valueOf(expr.getExpressionTag()));
+ }
+ VariableReferenceExpression argVarRef = (VariableReferenceExpression) expr;
+ LogicalVariable argVar = argVarRef.getVariableReference();
+ int argColumn = opSchema.findVariable(argVar);
+ if (argColumn < 0) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, String.valueOf(argVar));
+ }
+ columns[i] = argColumn;
+ }
+ return columns;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
index 74656e4..2a662d0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
@@ -30,7 +30,6 @@
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionManager;
import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.runtime.functions.FunctionTypeInferers;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -138,15 +137,9 @@
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
IScalarEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
- IFunctionDescriptor fd = null;
- if (expr.getFunctionInfo() instanceof IExternalFunctionInfo) {
- fd = ExternalFunctionDescriptorProvider
- .getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo());
- CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties();
- FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, env, props);
- } else {
- fd = resolveFunction(expr, env, context);
- }
+ IFunctionDescriptor fd = expr.getFunctionInfo() instanceof IExternalFunctionInfo
+ ? ExternalFunctionDescriptorProvider.resolveExternalFunction(expr, env, context)
+ : resolveFunction(expr, env, context);
return fd.createEvaluatorFactory(args);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 4a2b629..d2fe2f5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -40,6 +40,7 @@
import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
+import org.apache.asterix.optimizer.rules.ExtractBatchableExternalFunctionCallsRule;
import org.apache.asterix.optimizer.rules.ExtractDistinctByExpressionsRule;
import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule;
import org.apache.asterix.optimizer.rules.ExtractWindowExpressionsRule;
@@ -284,7 +285,7 @@
public static final List<IAlgebraicRewriteRule> buildConsolidationRuleCollection() {
List<IAlgebraicRewriteRule> consolidation = new LinkedList<>();
consolidation.add(new ConsolidateSelectsRule());
- consolidation.add(new ConsolidateAssignsRule());
+ consolidation.add(new ConsolidateAssignsRule(false));
consolidation.add(new InlineAssignIntoAggregateRule());
consolidation.add(new RewriteDistinctAggregateRule());
// The following rule should run after RewriteDistinctAggregateRule
@@ -353,6 +354,7 @@
public static final List<IAlgebraicRewriteRule> buildPhysicalRewritesAllLevelsRuleCollection() {
List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>();
physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
+ physicalRewritesAllLevels.add(new ExtractBatchableExternalFunctionCallsRule());
//Turned off the following rule for now not to change OptimizerTest results.
physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
@@ -369,7 +371,7 @@
physicalRewritesAllLevels.add(new IntroduceMaterializationForInsertWithSelfScanRule());
physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule());
physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
- physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
+ physicalRewritesAllLevels.add(new ConsolidateAssignsRule(true));
// After adding projects, we may need need to set physical operators again.
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
return physicalRewritesAllLevels;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java
new file mode 100644
index 0000000..1e5d805
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class ExtractBatchableExternalFunctionCallsRule implements IAlgebraicRewriteRule {
+
+ private final ExtractFunctionCallsVisitor extractVisitor = new ExtractFunctionCallsVisitor();
+
+ private Boolean isRuleEnabled;
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ if (isRuleEnabled == null) {
+ isRuleEnabled = SetAsterixPhysicalOperatorsRule.isBatchAssignEnabled(context);
+ }
+ if (!isRuleEnabled) {
+ return false;
+ }
+
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ List<Mutable<ILogicalExpression>> assignTopExprRefs = Collections.emptyList();
+ switch (op.getOperatorTag()) {
+ case ASSIGN:
+ assignTopExprRefs = ((AssignOperator) op).getExpressions();
+ break;
+ case SELECT:
+ break;
+ default:
+ return false;
+ }
+
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ context.addToDontApplySet(this, op);
+
+ extractVisitor.reset(context, assignTopExprRefs);
+ if (!op.acceptExpressionTransform(extractVisitor)) {
+ return false;
+ }
+ SourceLocation sourceLoc = op.getSourceLocation();
+
+ ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+ for (int i = 0, ln = extractVisitor.assignVars.size(); i < ln; i++) {
+ List<LogicalVariable> assignVarList = extractVisitor.assignVars.get(i);
+ List<Mutable<ILogicalExpression>> assignExprList = extractVisitor.assignExprs.get(i);
+ AssignOperator assignOp = new AssignOperator(assignVarList, assignExprList);
+ assignOp.setSourceLocation(sourceLoc);
+ assignOp.getInputs().add(new MutableObject<>(inputOp));
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+ assignOp.recomputeSchema();
+ OperatorPropertiesUtil.markMovable(assignOp, false);
+
+ context.addToDontApplySet(this, assignOp);
+ for (LogicalVariable assignVar : assignVarList) {
+ context.addNotToBeInlinedVar(assignVar);
+ }
+
+ inputOp = assignOp;
+ }
+
+ op.getInputs().clear();
+ op.getInputs().add(new MutableObject<>(inputOp));
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ op.recomputeSchema();
+ return true;
+ }
+
+ private static final class ExtractFunctionCallsVisitor implements ILogicalExpressionReferenceTransform {
+
+ private final List<List<LogicalVariable>> assignVars = new ArrayList<>();
+
+ private final List<List<Mutable<ILogicalExpression>>> assignExprs = new ArrayList<>();
+
+ private final List<LogicalVariable> usedVarList = new ArrayList<>();
+
+ private IOptimizationContext context;
+
+ private List<Mutable<ILogicalExpression>> dontExtractFromExprRefs;
+
+ public void reset(IOptimizationContext context, List<Mutable<ILogicalExpression>> dontExtractFromExprRefs) {
+ this.context = context;
+ this.dontExtractFromExprRefs = dontExtractFromExprRefs;
+ assignVars.clear();
+ assignExprs.clear();
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ ILogicalExpression expr = exprRef.getValue();
+ switch (expr.getExpressionTag()) {
+ case FUNCTION_CALL:
+ AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+ boolean applied = false;
+ for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+ applied |= transform(argRef);
+ }
+ AbstractFunctionCallExpression.FunctionKind fnKind = callExpr.getKind();
+ IFunctionInfo fnInfo = callExpr.getFunctionInfo();
+ if (ExternalFunctionCompilerUtil.supportsBatchInvocation(fnKind, fnInfo)
+ && callExpr.isFunctional()) {
+ // need to extract non-variable arguments into separate ASSIGNS
+ // because batched assign can only operate on columns
+ for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+ ILogicalExpression argExpr = argRef.getValue();
+ if (argExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ LogicalVariable newArgVar = context.newVar();
+ VariableReferenceExpression newArgVarRef = new VariableReferenceExpression(newArgVar);
+ newArgVarRef.setSourceLocation(expr.getSourceLocation());
+ saveAssignVar(newArgVar, argExpr);
+ argRef.setValue(newArgVarRef);
+ applied = true;
+ }
+ }
+ // need extract function call itself into a separate ASSIGN
+ // (unless it's already a top level expression of the ASSIGN operator we're visiting)
+ boolean dontExtractExprRef = indexOf(dontExtractFromExprRefs, exprRef) >= 0;
+ if (!dontExtractExprRef) {
+ LogicalVariable newVar = context.newVar();
+ VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+ newVarRef.setSourceLocation(expr.getSourceLocation());
+ saveAssignVar(newVar, expr);
+ exprRef.setValue(newVarRef);
+ applied = true;
+ }
+ }
+ return applied;
+ case VARIABLE:
+ case CONSTANT:
+ return false;
+ default:
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expr.getSourceLocation(),
+ expr.getExpressionTag().toString());
+ }
+ }
+
+ private void saveAssignVar(LogicalVariable var, ILogicalExpression expr) {
+ List<LogicalVariable> assignVarList = null;
+ List<Mutable<ILogicalExpression>> assignExprList = null;
+
+ if (!assignVars.isEmpty()) {
+ usedVarList.clear();
+ expr.getUsedVariables(usedVarList);
+ for (int i = 0, ln = assignVars.size(); i < ln; i++) {
+ List<LogicalVariable> candidateVarList = assignVars.get(i);
+ if (OperatorPropertiesUtil.disjoint(candidateVarList, usedVarList)) {
+ assignVarList = candidateVarList;
+ assignExprList = assignExprs.get(i);
+ break;
+ }
+ }
+ }
+
+ if (assignVarList == null) {
+ // first time, or couldn't find a disjoint var list
+ assignVarList = new ArrayList<>();
+ assignExprList = new ArrayList<>();
+ assignVars.add(assignVarList);
+ assignExprs.add(assignExprList);
+ }
+
+ assignVarList.add(var);
+ assignExprList.add(new MutableObject<>(expr));
+ }
+
+ public static int indexOf(List<Mutable<ILogicalExpression>> exprList, Mutable<ILogicalExpression> exprRef) {
+ return OperatorManipulationUtil.indexOf(exprList,
+ (listItemExprRef, paramExprRef) -> listItemExprRef == paramExprRef, exprRef);
+ }
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index e662737..ea51fc3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.algebra.operators.physical.AssignBatchPOperator;
import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator;
import org.apache.asterix.algebra.operators.physical.RTreeSearchPOperator;
@@ -30,6 +31,7 @@
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.optimizer.base.AnalysisUtil;
import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
@@ -49,10 +51,12 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
@@ -69,16 +73,47 @@
public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysicalOperatorsRule {
+ // Disable ASSIGN_BATCH physical operator if this option is set to 'false'
+ public static final String REWRITE_ATTEMPT_BATCH_ASSIGN = "rewrite_attempt_batch_assign";
+ static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = false;
+
@Override
protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
IOptimizationContext context) {
return new AsterixPhysicalOperatorFactoryVisitor(context);
}
+ static boolean isBatchAssignEnabled(IOptimizationContext context) {
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ return metadataProvider.getBooleanProperty(REWRITE_ATTEMPT_BATCH_ASSIGN, REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT);
+ }
+
private static class AsterixPhysicalOperatorFactoryVisitor extends AlgebricksPhysicalOperatorFactoryVisitor {
+ private final boolean isBatchAssignEnabled;
+
private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
super(context);
+ isBatchAssignEnabled = isBatchAssignEnabled(context);
+ }
+
+ @Override
+ public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> exprList = op.getExpressions();
+ boolean batchMode = isBatchAssignEnabled && exprList.size() > 0 && allBatchableFunctionCalls(exprList);
+ if (batchMode) {
+ // disable inlining of variable arguments
+ for (Mutable<ILogicalExpression> exprRef : exprList) {
+ AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) exprRef.getValue();
+ for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+ LogicalVariable var = ((VariableReferenceExpression) argRef.getValue()).getVariableReference();
+ context.addNotToBeInlinedVar(var);
+ }
+ }
+ return new AssignBatchPOperator();
+ } else {
+ return super.visitAssignOperator(op, topLevelOp);
+ }
}
@Override
@@ -280,5 +315,31 @@
return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
}
}
+
+ private boolean allBatchableFunctionCalls(List<Mutable<ILogicalExpression>> exprList)
+ throws CompilationException {
+ for (Mutable<ILogicalExpression> exprRef : exprList) {
+ if (!isBatchableFunctionCall(exprRef.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean isBatchableFunctionCall(ILogicalExpression expr) throws CompilationException {
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+ if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), callExpr.getFunctionInfo())) {
+ return false;
+ }
+ for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+ if (argRef.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9088db6..d60047e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -72,6 +72,7 @@
import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
import org.apache.asterix.optimizer.base.FuzzyUtils;
import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
+import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.ExecutionPlans;
@@ -141,7 +142,8 @@
StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
- DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION);
+ DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION,
+ SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN);
private final IRewriterFactory rewriterFactory;
private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
index 8ac507f..dc51c5f 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
@@ -18,15 +18,30 @@
*/
package org.apache.asterix.external.library;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
public class ExternalFunctionDescriptorProvider {
- public static IFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo)
+ public static IExternalFunctionDescriptor resolveExternalFunction(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment inputTypeEnv, JobGenContext context) throws AlgebricksException {
+ IExternalFunctionDescriptor fd = getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo());
+ CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties();
+ FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, inputTypeEnv, props);
+ fd.setSourceLocation(expr.getSourceLocation());
+ return fd;
+ }
+
+ private static IExternalFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo)
throws AlgebricksException {
switch (finfo.getKind()) {
case SCALAR:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
index 61ab3ea..63f0a13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
@@ -19,15 +19,15 @@
package org.apache.asterix.external.library;
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDynamicDescriptor
- implements IFunctionDescriptor {
+ implements IExternalFunctionDescriptor {
private static final long serialVersionUID = 2L;
private final IExternalFunctionInfo finfo;
@@ -54,4 +54,14 @@
public FunctionIdentifier getIdentifier() {
return finfo.getFunctionIdentifier();
}
+
+ @Override
+ public IExternalFunctionInfo getFunctionInfo() {
+ return finfo;
+ }
+
+ @Override
+ public IAType[] getArgumentTypes() {
+ return argTypes;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
new file mode 100644
index 0000000..44a17a9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outColumns;
+ private final IExternalFunctionDescriptor[] fnDescs;
+ private final int[][] fnArgColumns;
+
+ public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs,
+ int[][] fnArgColumns, int[] projectionList) {
+ super(projectionList);
+ this.outColumns = outColumns;
+ this.fnDescs = fnDescs;
+ this.fnArgColumns = fnArgColumns;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ throw new HyracksDataException(ErrorCode.OPERATOR_NOT_IMPLEMENTED, sourceLoc,
+ PhysicalOperatorTag.ASSIGN_BATCH.toString());
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
index 029b13a..f900c92 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
@@ -29,6 +29,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.BuiltinTypeMap;
import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
@@ -167,4 +168,23 @@
String.valueOf(actualSize), language.name());
}
}
+
+ public static boolean supportsBatchInvocation(FunctionKind fnKind, IFunctionInfo fnInfo)
+ throws CompilationException {
+ if (fnKind != FunctionKind.SCALAR) {
+ return false;
+ }
+ if (!(fnInfo instanceof IExternalFunctionInfo)) {
+ return false;
+ }
+ ExternalFunctionLanguage language = ((IExternalFunctionInfo) fnInfo).getLanguage();
+ switch (language) {
+ case JAVA:
+ return false;
+ case PYTHON:
+ return false;
+ default:
+ throw new CompilationException(ErrorCode.METADATA_ERROR, language.name());
+ }
+ }
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java
new file mode 100644
index 0000000..8d362ce
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.functions;
+
+import org.apache.asterix.om.types.IAType;
+
+public interface IExternalFunctionDescriptor extends IFunctionDescriptor {
+
+ IExternalFunctionInfo getFunctionInfo();
+
+ IAType[] getArgumentTypes();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index b7f6d62..d590f71 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -21,6 +21,7 @@
public enum PhysicalOperatorTag {
AGGREGATE,
ASSIGN,
+ ASSIGN_BATCH,
BROADCAST_EXCHANGE,
BTREE_SEARCH,
BULKLOAD,
@@ -33,7 +34,6 @@
FORWARD,
HASH_PARTITION_EXCHANGE,
HASH_PARTITION_MERGE_EXCHANGE,
- HDFS_READER,
HYBRID_HASH_JOIN,
IN_MEMORY_HASH_JOIN,
MICRO_STABLE_SORT,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java
new file mode 100644
index 0000000..4466a25
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractAssignPOperator extends AbstractPhysicalOperator {
+
+ protected boolean flushFramesRapidly;
+
+ protected String[] locations;
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AssignOperator assignOp = (AssignOperator) op;
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ if (assignOp.getExplicitOrderingProperty() != null) {
+ deliveredProperties.getLocalProperties().add(assignOp.getExplicitOrderingProperty());
+ }
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ AssignOperator assign = (AssignOperator) op;
+ List<LogicalVariable> variables = assign.getVariables();
+ int[] outColumns = new int[variables.size()];
+ for (int i = 0; i < outColumns.length; i++) {
+ outColumns[i] = opSchema.findVariable(variables.get(i));
+ }
+
+ // TODO push projections into the operator
+ int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+ IPushRuntimeFactory runtime =
+ createRuntimeFactory(context, assign, opSchema, inputSchemas, outColumns, projectionList);
+ runtime.setSourceLocation(assign.getSourceLocation());
+
+ // contribute one Asterix framewriter
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ if (locations != null && locations.length > 0) {
+ AlgebricksAbsolutePartitionConstraint locationConstraint =
+ new AlgebricksAbsolutePartitionConstraint(locations);
+ builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
+ } else {
+ builder.contributeMicroOperator(assign, runtime, recDesc);
+ }
+ // and contribute one edge from its child
+ ILogicalOperator src = assign.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, assign, 0);
+ }
+
+ protected abstract IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
+ throws AlgebricksException;
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
+ public void setLocationConstraint(String[] locations) {
+ this.locations = locations;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 23fa1ee..5f927fd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -21,105 +21,35 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-public class AssignPOperator extends AbstractPhysicalOperator {
-
- private boolean flushFramesRapidly;
- private String[] locations;
+public class AssignPOperator extends AbstractAssignPOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.ASSIGN;
}
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AssignOperator assignOp = (AssignOperator) op;
- ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- if (assignOp.getExplicitOrderingProperty() != null) {
- deliveredProperties.getLocalProperties().add(assignOp.getExplicitOrderingProperty());
- }
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
throws AlgebricksException {
- AssignOperator assign = (AssignOperator) op;
- List<LogicalVariable> variables = assign.getVariables();
- List<Mutable<ILogicalExpression>> expressions = assign.getExpressions();
- int[] outColumns = new int[variables.size()];
- for (int i = 0; i < outColumns.length; i++) {
- outColumns[i] = opSchema.findVariable(variables.get(i));
- }
+ List<Mutable<ILogicalExpression>> expressions = op.getExpressions();
IScalarEvaluatorFactory[] evalFactories = new IScalarEvaluatorFactory[expressions.size()];
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
for (int i = 0; i < evalFactories.length; i++) {
evalFactories[i] = expressionRuntimeProvider.createEvaluatorFactory(expressions.get(i).getValue(),
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
}
-
- // TODO push projections into the operator
- int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-
- AssignRuntimeFactory runtime =
- new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly);
- runtime.setSourceLocation(assign.getSourceLocation());
-
- // contribute one Asterix framewriter
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
- if (locations != null && locations.length > 0) {
- AlgebricksAbsolutePartitionConstraint locationConstraint =
- new AlgebricksAbsolutePartitionConstraint(locations);
- builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
- } else {
- builder.contributeMicroOperator(assign, runtime, recDesc);
- }
- // and contribute one edge from its child
- ILogicalOperator src = assign.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, assign, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- public void setRapidFrameFlush(boolean flushFramesRapidly) {
- this.flushFramesRapidly = flushFramesRapidly;
- }
-
- public void setLocationConstraint(String[] locations) {
- this.locations = locations;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return false;
+ return new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
index 3f7a0c1..5e10819 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
@@ -24,7 +24,6 @@
import java.util.Deque;
import java.util.List;
import java.util.function.BiFunction;
-import java.util.function.BiPredicate;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -34,6 +33,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -250,21 +250,11 @@
}
private static <T> int findItem(List<T> list, T item) {
- return indexOf(list, (listItem, paramItem) -> listItem == paramItem, item);
+ return OperatorManipulationUtil.indexOf(list, (listItem, paramItem) -> listItem == paramItem, item);
}
private static <T> int findNonNull(List<T> list) {
- return indexOf(list, (listItem, none) -> listItem != null, null);
- }
-
- private static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) {
- for (int i = 0, n = list.size(); i < n; i++) {
- T listItem = list.get(i);
- if (predicate.test(listItem, predicateParam)) {
- return i;
- }
- }
- return -1;
+ return OperatorManipulationUtil.indexOf(list, (listItem, none) -> listItem != null, null);
}
static String printOperator(Mutable<ILogicalOperator> opRef, IPlanPrettyPrinter printer) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index dd109ff..aa2ecdc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiPredicate;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -385,8 +386,21 @@
* @return operator position in the given list or {@code -1} if not found
*/
public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) {
+ return indexOf(list, (listItemOpRef, paramOp) -> listItemOpRef.getValue() == paramOp, op);
+ }
+
+ /**
+ * Find an item a given list
+ *
+ * @param list list to search in
+ * @param predicate predicate to test
+ * @param predicateParam parameter to pass to the predicate
+ * @return item position in the given list or {@code -1} if not found
+ */
+ public static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) {
for (int i = 0, ln = list.size(); i < ln; i++) {
- if (list.get(i).getValue() == op) {
+ T listItem = list.get(i);
+ if (predicate.test(listItem, predicateParam)) {
return i;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
index bbb6cbd..bf9ab6a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
@@ -35,6 +35,12 @@
public class ConsolidateAssignsRule implements IAlgebraicRewriteRule {
+ private final boolean recomputeSchema;
+
+ public ConsolidateAssignsRule(boolean recomputeSchema) {
+ this.recomputeSchema = recomputeSchema;
+ }
+
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
@@ -76,6 +82,9 @@
asgnInpList.clear();
asgnInpList.add(botOpRef);
context.computeAndSetTypeEnvironmentForOperator(assign1);
+ if (recomputeSchema) {
+ assign1.recomputeSchema();
+ }
return true;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7243827..b95971f 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -190,7 +190,7 @@
}
@Override
- public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) {
+ public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException {
return new AssignPOperator();
}