[NO ISSUE][COMP] Support batch assign for external functions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add compiler support for batch invocation of external functions

Change-Id: I1ed1f5c51628d996327de843f4977d083e9b4bd4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10006
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java
new file mode 100644
index 0000000..0ec5ee7
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.library.ExternalFunctionDescriptorProvider;
+import org.apache.asterix.external.operators.ExternalAssignBatchRuntimeFactory;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class AssignBatchPOperator extends AbstractAssignPOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.ASSIGN_BATCH;
+    }
+
+    @Override
+    protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
+            throws AlgebricksException {
+        IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+        List<Mutable<ILogicalExpression>> exprList = op.getExpressions();
+        int exprCount = exprList.size();
+        IExternalFunctionDescriptor[] fnDescs = new IExternalFunctionDescriptor[exprCount];
+        int[][] fnArgColumns = new int[exprCount][];
+        for (int i = 0; i < exprCount; i++) {
+            Mutable<ILogicalExpression> exprRef = exprList.get(i);
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                        String.valueOf(expr.getExpressionTag()));
+            }
+            AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+            IFunctionInfo fi = callExpr.getFunctionInfo();
+            if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), fi)) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                        fi.toString());
+            }
+            fnDescs[i] = ExternalFunctionDescriptorProvider.resolveExternalFunction(callExpr, inputTypeEnv, context);
+            fnArgColumns[i] = getColumns(callExpr.getArguments(), opSchema, op.getSourceLocation());
+        }
+
+        return new ExternalAssignBatchRuntimeFactory(outColumns, fnDescs, fnArgColumns, projectionList);
+    }
+
+    private int[] getColumns(List<Mutable<ILogicalExpression>> exprList, IOperatorSchema opSchema,
+            SourceLocation sourceLoc) throws CompilationException {
+        int n = exprList.size();
+        int[] columns = new int[n];
+        for (int i = 0; i < n; i++) {
+            ILogicalExpression expr = exprList.get(i).getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
+                        String.valueOf(expr.getExpressionTag()));
+            }
+            VariableReferenceExpression argVarRef = (VariableReferenceExpression) expr;
+            LogicalVariable argVar = argVarRef.getVariableReference();
+            int argColumn = opSchema.findVariable(argVar);
+            if (argColumn < 0) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, String.valueOf(argVar));
+            }
+            columns[i] = argColumn;
+        }
+        return columns;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
index 74656e4..2a662d0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionManager;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -138,15 +137,9 @@
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
             throws AlgebricksException {
         IScalarEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
-        IFunctionDescriptor fd = null;
-        if (expr.getFunctionInfo() instanceof IExternalFunctionInfo) {
-            fd = ExternalFunctionDescriptorProvider
-                    .getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo());
-            CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties();
-            FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, env, props);
-        } else {
-            fd = resolveFunction(expr, env, context);
-        }
+        IFunctionDescriptor fd = expr.getFunctionInfo() instanceof IExternalFunctionInfo
+                ? ExternalFunctionDescriptorProvider.resolveExternalFunction(expr, env, context)
+                : resolveFunction(expr, env, context);
         return fd.createEvaluatorFactory(args);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 4a2b629..d2fe2f5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
 import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
 import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
+import org.apache.asterix.optimizer.rules.ExtractBatchableExternalFunctionCallsRule;
 import org.apache.asterix.optimizer.rules.ExtractDistinctByExpressionsRule;
 import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule;
 import org.apache.asterix.optimizer.rules.ExtractWindowExpressionsRule;
@@ -284,7 +285,7 @@
     public static final List<IAlgebraicRewriteRule> buildConsolidationRuleCollection() {
         List<IAlgebraicRewriteRule> consolidation = new LinkedList<>();
         consolidation.add(new ConsolidateSelectsRule());
-        consolidation.add(new ConsolidateAssignsRule());
+        consolidation.add(new ConsolidateAssignsRule(false));
         consolidation.add(new InlineAssignIntoAggregateRule());
         consolidation.add(new RewriteDistinctAggregateRule());
         // The following rule should run after RewriteDistinctAggregateRule
@@ -353,6 +354,7 @@
     public static final List<IAlgebraicRewriteRule> buildPhysicalRewritesAllLevelsRuleCollection() {
         List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>();
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
+        physicalRewritesAllLevels.add(new ExtractBatchableExternalFunctionCallsRule());
         //Turned off the following rule for now not to change OptimizerTest results.
         physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
@@ -369,7 +371,7 @@
         physicalRewritesAllLevels.add(new IntroduceMaterializationForInsertWithSelfScanRule());
         physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule());
         physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
-        physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
+        physicalRewritesAllLevels.add(new ConsolidateAssignsRule(true));
         // After adding projects, we may need need to set physical operators again.
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         return physicalRewritesAllLevels;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java
new file mode 100644
index 0000000..1e5d805
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class ExtractBatchableExternalFunctionCallsRule implements IAlgebraicRewriteRule {
+
+    private final ExtractFunctionCallsVisitor extractVisitor = new ExtractFunctionCallsVisitor();
+
+    private Boolean isRuleEnabled;
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        if (isRuleEnabled == null) {
+            isRuleEnabled = SetAsterixPhysicalOperatorsRule.isBatchAssignEnabled(context);
+        }
+        if (!isRuleEnabled) {
+            return false;
+        }
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        List<Mutable<ILogicalExpression>> assignTopExprRefs = Collections.emptyList();
+        switch (op.getOperatorTag()) {
+            case ASSIGN:
+                assignTopExprRefs = ((AssignOperator) op).getExpressions();
+                break;
+            case SELECT:
+                break;
+            default:
+                return false;
+        }
+
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        context.addToDontApplySet(this, op);
+
+        extractVisitor.reset(context, assignTopExprRefs);
+        if (!op.acceptExpressionTransform(extractVisitor)) {
+            return false;
+        }
+        SourceLocation sourceLoc = op.getSourceLocation();
+
+        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+        for (int i = 0, ln = extractVisitor.assignVars.size(); i < ln; i++) {
+            List<LogicalVariable> assignVarList = extractVisitor.assignVars.get(i);
+            List<Mutable<ILogicalExpression>> assignExprList = extractVisitor.assignExprs.get(i);
+            AssignOperator assignOp = new AssignOperator(assignVarList, assignExprList);
+            assignOp.setSourceLocation(sourceLoc);
+            assignOp.getInputs().add(new MutableObject<>(inputOp));
+            context.computeAndSetTypeEnvironmentForOperator(assignOp);
+            assignOp.recomputeSchema();
+            OperatorPropertiesUtil.markMovable(assignOp, false);
+
+            context.addToDontApplySet(this, assignOp);
+            for (LogicalVariable assignVar : assignVarList) {
+                context.addNotToBeInlinedVar(assignVar);
+            }
+
+            inputOp = assignOp;
+        }
+
+        op.getInputs().clear();
+        op.getInputs().add(new MutableObject<>(inputOp));
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        op.recomputeSchema();
+        return true;
+    }
+
+    private static final class ExtractFunctionCallsVisitor implements ILogicalExpressionReferenceTransform {
+
+        private final List<List<LogicalVariable>> assignVars = new ArrayList<>();
+
+        private final List<List<Mutable<ILogicalExpression>>> assignExprs = new ArrayList<>();
+
+        private final List<LogicalVariable> usedVarList = new ArrayList<>();
+
+        private IOptimizationContext context;
+
+        private List<Mutable<ILogicalExpression>> dontExtractFromExprRefs;
+
+        public void reset(IOptimizationContext context, List<Mutable<ILogicalExpression>> dontExtractFromExprRefs) {
+            this.context = context;
+            this.dontExtractFromExprRefs = dontExtractFromExprRefs;
+            assignVars.clear();
+            assignExprs.clear();
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            ILogicalExpression expr = exprRef.getValue();
+            switch (expr.getExpressionTag()) {
+                case FUNCTION_CALL:
+                    AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+                    boolean applied = false;
+                    for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+                        applied |= transform(argRef);
+                    }
+                    AbstractFunctionCallExpression.FunctionKind fnKind = callExpr.getKind();
+                    IFunctionInfo fnInfo = callExpr.getFunctionInfo();
+                    if (ExternalFunctionCompilerUtil.supportsBatchInvocation(fnKind, fnInfo)
+                            && callExpr.isFunctional()) {
+                        // need to extract non-variable arguments into separate ASSIGNS
+                        // because batched assign can only operate on columns
+                        for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+                            ILogicalExpression argExpr = argRef.getValue();
+                            if (argExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                                LogicalVariable newArgVar = context.newVar();
+                                VariableReferenceExpression newArgVarRef = new VariableReferenceExpression(newArgVar);
+                                newArgVarRef.setSourceLocation(expr.getSourceLocation());
+                                saveAssignVar(newArgVar, argExpr);
+                                argRef.setValue(newArgVarRef);
+                                applied = true;
+                            }
+                        }
+                        // need extract function call itself into a separate ASSIGN
+                        // (unless it's already a top level expression of the ASSIGN operator we're visiting)
+                        boolean dontExtractExprRef = indexOf(dontExtractFromExprRefs, exprRef) >= 0;
+                        if (!dontExtractExprRef) {
+                            LogicalVariable newVar = context.newVar();
+                            VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+                            newVarRef.setSourceLocation(expr.getSourceLocation());
+                            saveAssignVar(newVar, expr);
+                            exprRef.setValue(newVarRef);
+                            applied = true;
+                        }
+                    }
+                    return applied;
+                case VARIABLE:
+                case CONSTANT:
+                    return false;
+                default:
+                    throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expr.getSourceLocation(),
+                            expr.getExpressionTag().toString());
+            }
+        }
+
+        private void saveAssignVar(LogicalVariable var, ILogicalExpression expr) {
+            List<LogicalVariable> assignVarList = null;
+            List<Mutable<ILogicalExpression>> assignExprList = null;
+
+            if (!assignVars.isEmpty()) {
+                usedVarList.clear();
+                expr.getUsedVariables(usedVarList);
+                for (int i = 0, ln = assignVars.size(); i < ln; i++) {
+                    List<LogicalVariable> candidateVarList = assignVars.get(i);
+                    if (OperatorPropertiesUtil.disjoint(candidateVarList, usedVarList)) {
+                        assignVarList = candidateVarList;
+                        assignExprList = assignExprs.get(i);
+                        break;
+                    }
+                }
+            }
+
+            if (assignVarList == null) {
+                // first time, or couldn't find a disjoint var list
+                assignVarList = new ArrayList<>();
+                assignExprList = new ArrayList<>();
+                assignVars.add(assignVarList);
+                assignExprs.add(assignExprList);
+            }
+
+            assignVarList.add(var);
+            assignExprList.add(new MutableObject<>(expr));
+        }
+
+        public static int indexOf(List<Mutable<ILogicalExpression>> exprList, Mutable<ILogicalExpression> exprRef) {
+            return OperatorManipulationUtil.indexOf(exprList,
+                    (listItemExprRef, paramExprRef) -> listItemExprRef == paramExprRef, exprRef);
+        }
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index e662737..ea51fc3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.physical.AssignBatchPOperator;
 import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
 import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator;
 import org.apache.asterix.algebra.operators.physical.RTreeSearchPOperator;
@@ -30,6 +31,7 @@
 import org.apache.asterix.metadata.declared.DataSourceId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.optimizer.base.AnalysisUtil;
 import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
@@ -49,10 +51,12 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
@@ -69,16 +73,47 @@
 
 public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysicalOperatorsRule {
 
+    // Disable ASSIGN_BATCH physical operator if this option is set to 'false'
+    public static final String REWRITE_ATTEMPT_BATCH_ASSIGN = "rewrite_attempt_batch_assign";
+    static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = false;
+
     @Override
     protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
             IOptimizationContext context) {
         return new AsterixPhysicalOperatorFactoryVisitor(context);
     }
 
+    static boolean isBatchAssignEnabled(IOptimizationContext context) {
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        return metadataProvider.getBooleanProperty(REWRITE_ATTEMPT_BATCH_ASSIGN, REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT);
+    }
+
     private static class AsterixPhysicalOperatorFactoryVisitor extends AlgebricksPhysicalOperatorFactoryVisitor {
 
+        private final boolean isBatchAssignEnabled;
+
         private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
             super(context);
+            isBatchAssignEnabled = isBatchAssignEnabled(context);
+        }
+
+        @Override
+        public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> exprList = op.getExpressions();
+            boolean batchMode = isBatchAssignEnabled && exprList.size() > 0 && allBatchableFunctionCalls(exprList);
+            if (batchMode) {
+                // disable inlining of variable arguments
+                for (Mutable<ILogicalExpression> exprRef : exprList) {
+                    AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) exprRef.getValue();
+                    for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+                        LogicalVariable var = ((VariableReferenceExpression) argRef.getValue()).getVariableReference();
+                        context.addNotToBeInlinedVar(var);
+                    }
+                }
+                return new AssignBatchPOperator();
+            } else {
+                return super.visitAssignOperator(op, topLevelOp);
+            }
         }
 
         @Override
@@ -280,5 +315,31 @@
                 return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
             }
         }
+
+        private boolean allBatchableFunctionCalls(List<Mutable<ILogicalExpression>> exprList)
+                throws CompilationException {
+            for (Mutable<ILogicalExpression> exprRef : exprList) {
+                if (!isBatchableFunctionCall(exprRef.getValue())) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        private static boolean isBatchableFunctionCall(ILogicalExpression expr) throws CompilationException {
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                return false;
+            }
+            AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+            if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), callExpr.getFunctionInfo())) {
+                return false;
+            }
+            for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) {
+                if (argRef.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    return false;
+                }
+            }
+            return true;
+        }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9088db6..d60047e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -72,6 +72,7 @@
 import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
 import org.apache.asterix.optimizer.base.FuzzyUtils;
 import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
+import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.ExecutionPlans;
@@ -141,7 +142,8 @@
             StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
             FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
             SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
-            DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION);
+            DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION,
+            SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN);
 
     private final IRewriterFactory rewriterFactory;
     private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
index 8ac507f..dc51c5f 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java
@@ -18,15 +18,30 @@
  */
 package org.apache.asterix.external.library;
 
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 
 public class ExternalFunctionDescriptorProvider {
 
-    public static IFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo)
+    public static IExternalFunctionDescriptor resolveExternalFunction(AbstractFunctionCallExpression expr,
+            IVariableTypeEnvironment inputTypeEnv, JobGenContext context) throws AlgebricksException {
+        IExternalFunctionDescriptor fd = getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo());
+        CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties();
+        FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, inputTypeEnv, props);
+        fd.setSourceLocation(expr.getSourceLocation());
+        return fd;
+    }
+
+    private static IExternalFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo)
             throws AlgebricksException {
         switch (finfo.getKind()) {
             case SCALAR:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
index 61ab3ea..63f0a13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
@@ -19,15 +19,15 @@
 
 package org.apache.asterix.external.library;
 
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 
 public class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDynamicDescriptor
-        implements IFunctionDescriptor {
+        implements IExternalFunctionDescriptor {
 
     private static final long serialVersionUID = 2L;
     private final IExternalFunctionInfo finfo;
@@ -54,4 +54,14 @@
     public FunctionIdentifier getIdentifier() {
         return finfo.getFunctionIdentifier();
     }
+
+    @Override
+    public IExternalFunctionInfo getFunctionInfo() {
+        return finfo;
+    }
+
+    @Override
+    public IAType[] getArgumentTypes() {
+        return argTypes;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
new file mode 100644
index 0000000..44a17a9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] outColumns;
+    private final IExternalFunctionDescriptor[] fnDescs;
+    private final int[][] fnArgColumns;
+
+    public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs,
+            int[][] fnArgColumns, int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.fnDescs = fnDescs;
+        this.fnArgColumns = fnArgColumns;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
+            throws HyracksDataException {
+        throw new HyracksDataException(ErrorCode.OPERATOR_NOT_IMPLEMENTED, sourceLoc,
+                PhysicalOperatorTag.ASSIGN_BATCH.toString());
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
index 029b13a..f900c92 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.BuiltinTypeMap;
 import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
@@ -167,4 +168,23 @@
                     String.valueOf(actualSize), language.name());
         }
     }
+
+    public static boolean supportsBatchInvocation(FunctionKind fnKind, IFunctionInfo fnInfo)
+            throws CompilationException {
+        if (fnKind != FunctionKind.SCALAR) {
+            return false;
+        }
+        if (!(fnInfo instanceof IExternalFunctionInfo)) {
+            return false;
+        }
+        ExternalFunctionLanguage language = ((IExternalFunctionInfo) fnInfo).getLanguage();
+        switch (language) {
+            case JAVA:
+                return false;
+            case PYTHON:
+                return false;
+            default:
+                throw new CompilationException(ErrorCode.METADATA_ERROR, language.name());
+        }
+    }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java
new file mode 100644
index 0000000..8d362ce
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.functions;
+
+import org.apache.asterix.om.types.IAType;
+
+public interface IExternalFunctionDescriptor extends IFunctionDescriptor {
+
+    IExternalFunctionInfo getFunctionInfo();
+
+    IAType[] getArgumentTypes();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index b7f6d62..d590f71 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -21,6 +21,7 @@
 public enum PhysicalOperatorTag {
     AGGREGATE,
     ASSIGN,
+    ASSIGN_BATCH,
     BROADCAST_EXCHANGE,
     BTREE_SEARCH,
     BULKLOAD,
@@ -33,7 +34,6 @@
     FORWARD,
     HASH_PARTITION_EXCHANGE,
     HASH_PARTITION_MERGE_EXCHANGE,
-    HDFS_READER,
     HYBRID_HASH_JOIN,
     IN_MEMORY_HASH_JOIN,
     MICRO_STABLE_SORT,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java
new file mode 100644
index 0000000..4466a25
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractAssignPOperator extends AbstractPhysicalOperator {
+
+    protected boolean flushFramesRapidly;
+
+    protected String[] locations;
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AssignOperator assignOp = (AssignOperator) op;
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        if (assignOp.getExplicitOrderingProperty() != null) {
+            deliveredProperties.getLocalProperties().add(assignOp.getExplicitOrderingProperty());
+        }
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        AssignOperator assign = (AssignOperator) op;
+        List<LogicalVariable> variables = assign.getVariables();
+        int[] outColumns = new int[variables.size()];
+        for (int i = 0; i < outColumns.length; i++) {
+            outColumns[i] = opSchema.findVariable(variables.get(i));
+        }
+
+        // TODO push projections into the operator
+        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+        IPushRuntimeFactory runtime =
+                createRuntimeFactory(context, assign, opSchema, inputSchemas, outColumns, projectionList);
+        runtime.setSourceLocation(assign.getSourceLocation());
+
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        if (locations != null && locations.length > 0) {
+            AlgebricksAbsolutePartitionConstraint locationConstraint =
+                    new AlgebricksAbsolutePartitionConstraint(locations);
+            builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
+        } else {
+            builder.contributeMicroOperator(assign, runtime, recDesc);
+        }
+        // and contribute one edge from its child
+        ILogicalOperator src = assign.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, assign, 0);
+    }
+
+    protected abstract IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
+            throws AlgebricksException;
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    public void setRapidFrameFlush(boolean flushFramesRapidly) {
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
+
+    public void setLocationConstraint(String[] locations) {
+        this.locations = locations;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 23fa1ee..5f927fd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -21,105 +21,35 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
-public class AssignPOperator extends AbstractPhysicalOperator {
-
-    private boolean flushFramesRapidly;
-    private String[] locations;
+public class AssignPOperator extends AbstractAssignPOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.ASSIGN;
     }
 
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AssignOperator assignOp = (AssignOperator) op;
-        ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
-        if (assignOp.getExplicitOrderingProperty() != null) {
-            deliveredProperties.getLocalProperties().add(assignOp.getExplicitOrderingProperty());
-        }
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        return emptyUnaryRequirements();
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+    protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList)
             throws AlgebricksException {
-        AssignOperator assign = (AssignOperator) op;
-        List<LogicalVariable> variables = assign.getVariables();
-        List<Mutable<ILogicalExpression>> expressions = assign.getExpressions();
-        int[] outColumns = new int[variables.size()];
-        for (int i = 0; i < outColumns.length; i++) {
-            outColumns[i] = opSchema.findVariable(variables.get(i));
-        }
+        List<Mutable<ILogicalExpression>> expressions = op.getExpressions();
         IScalarEvaluatorFactory[] evalFactories = new IScalarEvaluatorFactory[expressions.size()];
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
         for (int i = 0; i < evalFactories.length; i++) {
             evalFactories[i] = expressionRuntimeProvider.createEvaluatorFactory(expressions.get(i).getValue(),
                     context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
         }
-
-        // TODO push projections into the operator
-        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-
-        AssignRuntimeFactory runtime =
-                new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly);
-        runtime.setSourceLocation(assign.getSourceLocation());
-
-        // contribute one Asterix framewriter
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
-        if (locations != null && locations.length > 0) {
-            AlgebricksAbsolutePartitionConstraint locationConstraint =
-                    new AlgebricksAbsolutePartitionConstraint(locations);
-            builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
-        } else {
-            builder.contributeMicroOperator(assign, runtime, recDesc);
-        }
-        // and contribute one edge from its child
-        ILogicalOperator src = assign.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, assign, 0);
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return true;
-    }
-
-    public void setRapidFrameFlush(boolean flushFramesRapidly) {
-        this.flushFramesRapidly = flushFramesRapidly;
-    }
-
-    public void setLocationConstraint(String[] locations) {
-        this.locations = locations;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return false;
+        return new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
index 3f7a0c1..5e10819 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
@@ -24,7 +24,6 @@
 import java.util.Deque;
 import java.util.List;
 import java.util.function.BiFunction;
-import java.util.function.BiPredicate;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -34,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
@@ -250,21 +250,11 @@
     }
 
     private static <T> int findItem(List<T> list, T item) {
-        return indexOf(list, (listItem, paramItem) -> listItem == paramItem, item);
+        return OperatorManipulationUtil.indexOf(list, (listItem, paramItem) -> listItem == paramItem, item);
     }
 
     private static <T> int findNonNull(List<T> list) {
-        return indexOf(list, (listItem, none) -> listItem != null, null);
-    }
-
-    private static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) {
-        for (int i = 0, n = list.size(); i < n; i++) {
-            T listItem = list.get(i);
-            if (predicate.test(listItem, predicateParam)) {
-                return i;
-            }
-        }
-        return -1;
+        return OperatorManipulationUtil.indexOf(list, (listItem, none) -> listItem != null, null);
     }
 
     static String printOperator(Mutable<ILogicalOperator> opRef, IPlanPrettyPrinter printer) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index dd109ff..aa2ecdc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiPredicate;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -385,8 +386,21 @@
      * @return operator position in the given list or {@code -1} if not found
      */
     public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) {
+        return indexOf(list, (listItemOpRef, paramOp) -> listItemOpRef.getValue() == paramOp, op);
+    }
+
+    /**
+     * Find an item a given list
+     *
+     * @param list list to search in
+     * @param predicate predicate to test
+     * @param predicateParam parameter to pass to the predicate
+     * @return item position in the given list or {@code -1} if not found
+     */
+    public static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) {
         for (int i = 0, ln = list.size(); i < ln; i++) {
-            if (list.get(i).getValue() == op) {
+            T listItem = list.get(i);
+            if (predicate.test(listItem, predicateParam)) {
                 return i;
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
index bbb6cbd..bf9ab6a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java
@@ -35,6 +35,12 @@
 
 public class ConsolidateAssignsRule implements IAlgebraicRewriteRule {
 
+    private final boolean recomputeSchema;
+
+    public ConsolidateAssignsRule(boolean recomputeSchema) {
+        this.recomputeSchema = recomputeSchema;
+    }
+
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
@@ -76,6 +82,9 @@
         asgnInpList.clear();
         asgnInpList.add(botOpRef);
         context.computeAndSetTypeEnvironmentForOperator(assign1);
+        if (recomputeSchema) {
+            assign1.recomputeSchema();
+        }
         return true;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7243827..b95971f 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -190,7 +190,7 @@
         }
 
         @Override
-        public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) {
+        public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException {
             return new AssignPOperator();
         }