diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 8677d0f..6a11abf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -132,7 +132,6 @@
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
-import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
@@ -359,7 +358,6 @@
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
         //Turned off the following rule for now not to change OptimizerTest results.
         physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
-        physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
@@ -373,7 +371,7 @@
         physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
         physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
         // After adding projects, we may need need to set physical operators again.
-        physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
+        physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         return physicalRewritesAllLevels;
     }
 
@@ -390,7 +388,7 @@
         // remove assigns that could become unused after PushLimitIntoPrimarySearchRule
         physicalRewritesTopLevel.add(new RemoveUnusedAssignAndAggregateRule());
         physicalRewritesTopLevel.add(new IntroduceProjectsRule());
-        physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
+        physicalRewritesTopLevel.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
         physicalRewritesTopLevel.add(new SetExecutionModeRule());
         physicalRewritesTopLevel.add(new IntroduceRandomPartitioningFeedComputationRule());
@@ -400,7 +398,7 @@
     public static final List<IAlgebraicRewriteRule> prepareForJobGenRuleCollection() {
         List<IAlgebraicRewriteRule> prepareForJobGenRewrites = new LinkedList<>();
         prepareForJobGenRewrites.add(new InsertProjectBeforeUnionRule());
-        prepareForJobGenRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
+        prepareForJobGenRewrites.add(new SetAsterixPhysicalOperatorsRule());
         prepareForJobGenRewrites
                 .add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 4314b3a..b26eaca 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -37,349 +37,223 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
 
-public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
+public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysicalOperatorsRule {
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
+    protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
+            IOptimizationContext context) {
+        return new AsterixPhysicalOperatorFactoryVisitor(context);
     }
 
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (context.checkIfInDontApplySet(this, op)) {
-            return false;
+    private static class AsterixPhysicalOperatorFactoryVisitor extends AlgebricksPhysicalOperatorFactoryVisitor {
+
+        private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
+            super(context);
         }
 
-        computeDefaultPhysicalOp(op, true, context);
-        context.addToDontApplySet(this, op);
-        return true;
-    }
+        @Override
+        public ExternalGroupByPOperator createExternalGroupByPOperator(GroupByOperator gby) throws AlgebricksException {
+            Mutable<ILogicalOperator> r0 = gby.getNestedPlans().get(0).getRoots().get(0);
+            if (!r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
+                return null;
+            }
+            AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+            boolean serializable = aggOp.getExpressions().stream()
+                    .allMatch(exprRef -> exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
+                            && BuiltinFunctions.isAggregateFunctionSerializable(
+                                    ((AbstractFunctionCallExpression) exprRef.getValue()).getFunctionIdentifier()));
+            if (!serializable) {
+                return null;
+            }
 
-    private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context)
-            throws AlgebricksException {
-        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context);
+            // if serializable, use external group-by
+            // now check whether the serialized version aggregation function has corresponding intermediate agg
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+                    context.getMergeAggregationExpressionFactory();
+            List<LogicalVariable> originalVariables = aggOp.getVariables();
+            List<Mutable<ILogicalExpression>> aggExprs = aggOp.getExpressions();
+            int aggNum = aggExprs.size();
+            for (int i = 0; i < aggNum; i++) {
+                AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+                AggregateFunctionCallExpression serialAggExpr = BuiltinFunctions
+                        .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(), expr.getArguments());
+                serialAggExpr.setSourceLocation(expr.getSourceLocation());
+                if (mergeAggregationExpressionFactory.createMergeAggregation(originalVariables.get(i), serialAggExpr,
+                        context) == null) {
+                    return null;
+                }
+            }
+
+            // Check whether there are multiple aggregates in the sub plan.
+            // Currently, we don't support multiple aggregates in one external group-by.
+            ILogicalOperator r1Logical = aggOp;
+            while (r1Logical.hasInputs()) {
+                r1Logical = r1Logical.getInputs().get(0).getValue();
+                if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    return null;
+                }
+            }
+
+            for (int i = 0; i < aggNum; i++) {
+                AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+                AggregateFunctionCallExpression serialAggExpr = BuiltinFunctions
+                        .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(), expr.getArguments());
+                serialAggExpr.setSourceLocation(expr.getSourceLocation());
+                aggOp.getExpressions().get(i).setValue(serialAggExpr);
+            }
+
+            generateMergeAggregationExpressions(gby);
+
+            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+                            * physicalOptimizationConfig.getFrameSize());
         }
-    }
 
-    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
-            IOptimizationContext context) throws AlgebricksException {
-        PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
-        if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
-            GroupByOperator gby = (GroupByOperator) op;
-            if (gby.getNestedPlans().size() == 1) {
-                ILogicalPlan p0 = gby.getNestedPlans().get(0);
-                if (p0.getRoots().size() == 1) {
-                    Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
-                    if (r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
-                        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
-                        boolean serializable = true;
-                        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
-                            AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
-                            if (!BuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
-                                serializable = false;
-                                break;
-                            }
-                        }
+        private void generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
+            if (gby.getNestedPlans().size() != 1) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+                        "External group-by currently works only for one nested plan with one root containing"
+                                + "an aggregate and a nested-tuple-source.");
+            }
+            ILogicalPlan p0 = gby.getNestedPlans().get(0);
+            if (p0.getRoots().size() != 1) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+                        "External group-by currently works only for one nested plan with one root containing"
+                                + "an aggregate and a nested-tuple-source.");
+            }
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+                    context.getMergeAggregationExpressionFactory();
+            Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+            AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+            if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
+                        "The merge aggregation expression generation should not process a " + r0Logical.getOperatorTag()
+                                + " operator.");
+            }
+            AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+            List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+            List<LogicalVariable> aggProducedVars = aggOp.getVariables();
+            int n = aggOp.getExpressions().size();
+            List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<>();
+            for (int i = 0; i < n; i++) {
+                ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
+                ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+                        .createMergeAggregation(aggProducedVars.get(i), aggFuncExpr, context);
+                if (mergeExpr == null) {
+                    throw new CompilationException(ErrorCode.COMPILATION_ERROR, aggFuncExpr.getSourceLocation(),
+                            "The aggregation function "
+                                    + ((AbstractFunctionCallExpression) aggFuncExpr).getFunctionIdentifier().getName()
+                                    + " does not have a registered intermediate aggregation function.");
+                }
+                mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+            }
+            aggOp.setMergeExpressions(mergeExpressionRefs);
+        }
 
-                        if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
-                                .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
-                            boolean setToExternalGby = false;
-                            if (serializable) {
-                                // if serializable, use external group-by
-                                // now check whether the serialized version aggregation function has corresponding intermediate agg
-                                boolean hasIntermediateAgg = true;
-                                IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
-                                        context.getMergeAggregationExpressionFactory();
-                                List<LogicalVariable> originalVariables = aggOp.getVariables();
-                                List<Mutable<ILogicalExpression>> aggExprs = aggOp.getExpressions();
-                                int aggNum = aggExprs.size();
-                                for (int i = 0; i < aggNum; i++) {
-                                    AbstractFunctionCallExpression expr =
-                                            (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
-                                    AggregateFunctionCallExpression serialAggExpr =
-                                            BuiltinFunctions.makeSerializableAggregateFunctionExpression(
-                                                    expr.getFunctionIdentifier(), expr.getArguments());
-                                    serialAggExpr.setSourceLocation(expr.getSourceLocation());
-                                    if (mergeAggregationExpressionFactory.createMergeAggregation(
-                                            originalVariables.get(i), serialAggExpr, context) == null) {
-                                        hasIntermediateAgg = false;
-                                        break;
-                                    }
-                                }
+        @Override
+        public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op);
+        }
 
-                                // Check whether there are multiple aggregates in the sub plan.
-                                // Currently, we don't support multiple aggregates in one external group-by.
-                                boolean multipleAggOpsFound = false;
-                                ILogicalOperator r1Logical = aggOp;
-                                while (r1Logical.hasInputs()) {
-                                    r1Logical = r1Logical.getInputs().get(0).getValue();
-                                    if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-                                        multipleAggOpsFound = true;
-                                        break;
-                                    }
-                                }
+        @Override
+        public IPhysicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op);
+        }
 
-                                if (hasIntermediateAgg && !multipleAggOpsFound) {
-                                    for (int i = 0; i < aggNum; i++) {
-                                        AbstractFunctionCallExpression expr =
-                                                (AbstractFunctionCallExpression) aggExprs.get(i).getValue();
-                                        AggregateFunctionCallExpression serialAggExpr =
-                                                BuiltinFunctions.makeSerializableAggregateFunctionExpression(
-                                                        expr.getFunctionIdentifier(), expr.getArguments());
-                                        serialAggExpr.setSourceLocation(expr.getSourceLocation());
-                                        aggOp.getExpressions().get(i).setValue(serialAggExpr);
-                                    }
-                                    ExternalGroupByPOperator externalGby =
-                                            new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                                                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                                                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                                                            * physicalOptimizationConfig.getFrameSize());
-                                    generateMergeAggregationExpressions(gby, context);
-                                    op.setPhysicalOperator(externalGby);
-                                    setToExternalGby = true;
-                                }
-                            }
-
-                            if (!setToExternalGby) {
-                                // if not serializable or no intermediate agg, use pre-clustered group-by
-                                List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
-                                List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
-                                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-                                    ILogicalExpression expr = p.second.getValue();
-                                    if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                        VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-                                        columnList.add(varRef.getVariableReference());
-                                    }
-                                }
-                                op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
-                                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
-                            }
-                        }
-                    } else if (r0.getValue().getOperatorTag().equals(LogicalOperatorTag.RUNNINGAGGREGATE)) {
-                        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
-                        List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
-                        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-                            ILogicalExpression expr = p.second.getValue();
-                            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-                                columnList.add(varRef.getVariableReference());
-                            }
-                        }
-                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
-                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
-                    } else {
-                        throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
-                                "Unsupported nested operator within a group-by: "
-                                        + r0.getValue().getOperatorTag().name());
-                    }
+        private IPhysicalOperator visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op)
+                throws AlgebricksException {
+            ILogicalExpression unnestExpr = op.getExpressionRef().getValue();
+            if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation());
+            }
+            AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+            if (!f.getFunctionIdentifier().equals(BuiltinFunctions.INDEX_SEARCH)) {
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation());
+            }
+            AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+            jobGenParams.readFromFuncArgs(f.getArguments());
+            MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+            DataSourceId dataSourceId =
+                    new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+            Dataset dataset = mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+            IDataSourceIndex<String, DataSourceId> dsi =
+                    mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
+            INodeDomain storageDomain = mp.findNodeDomain(dataset.getNodeGroupName());
+            if (dsi == null) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
+                        "Could not find index " + jobGenParams.getIndexName() + " for dataset " + dataSourceId);
+            }
+            IndexType indexType = jobGenParams.getIndexType();
+            boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
+            switch (indexType) {
+                case BTREE: {
+                    BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+                    btreeJobGenParams.readFromFuncArgs(f.getArguments());
+                    return new BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
+                            btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
+                            btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList());
+                }
+                case RTREE: {
+                    return new RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast);
+                }
+                case SINGLE_PARTITION_WORD_INVIX:
+                case SINGLE_PARTITION_NGRAM_INVIX: {
+                    return new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, false);
+                }
+                case LENGTH_PARTITIONED_WORD_INVIX:
+                case LENGTH_PARTITIONED_NGRAM_INVIX: {
+                    return new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, true);
+                }
+                default: {
+                    throw AlgebricksException.create(
+                            org.apache.hyracks.api.exceptions.ErrorCode.OPERATOR_NOT_IMPLEMENTED,
+                            op.getSourceLocation(), op.getOperatorTag().toString() + " with " + indexType + " index");
                 }
             }
         }
-        if (op.getPhysicalOperator() == null) {
-            switch (op.getOperatorTag()) {
-                case INNERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context);
-                    break;
-                }
-                case LEFTOUTERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context);
-                    break;
-                }
-                case UNNEST_MAP:
-                case LEFT_OUTER_UNNEST_MAP: {
-                    ILogicalExpression unnestExpr = null;
-                    unnestExpr = ((AbstractUnnestMapOperator) op).getExpressionRef().getValue();
-                    if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
-                        FunctionIdentifier fid = f.getFunctionIdentifier();
-                        if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) {
-                            throw new IllegalStateException();
-                        }
-                        AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
-                        jobGenParams.readFromFuncArgs(f.getArguments());
-                        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
-                        DataSourceId dataSourceId =
-                                new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
-                        Dataset dataset =
-                                mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
-                        IDataSourceIndex<String, DataSourceId> dsi =
-                                mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
-                        INodeDomain storageDomain = mp.findNodeDomain(dataset.getNodeGroupName());
-                        if (dsi == null) {
-                            throw new CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
-                                    "Could not find index " + jobGenParams.getIndexName() + " for dataset "
-                                            + dataSourceId);
-                        }
-                        IndexType indexType = jobGenParams.getIndexType();
-                        boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
-                        switch (indexType) {
-                            case BTREE: {
-                                BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
-                                btreeJobGenParams.readFromFuncArgs(f.getArguments());
-                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
-                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
-                                        btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList()));
-                                break;
-                            }
-                            case RTREE: {
-                                op.setPhysicalOperator(new RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast));
-                                break;
-                            }
-                            case SINGLE_PARTITION_WORD_INVIX:
-                            case SINGLE_PARTITION_NGRAM_INVIX: {
-                                op.setPhysicalOperator(
-                                        new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, false));
-                                break;
-                            }
-                            case LENGTH_PARTITIONED_WORD_INVIX:
-                            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                                op.setPhysicalOperator(
-                                        new InvertedIndexPOperator(dsi, storageDomain, requiresBroadcast, true));
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException(indexType + " indexes are not implemented.");
-                            }
-                        }
-                    }
-                    break;
-                }
-                case WINDOW: {
-                    WindowOperator winOp = (WindowOperator) op;
-                    WindowPOperator physOp = createWindowPOperator(winOp, context);
-                    op.setPhysicalOperator(physOp);
-                    break;
-                }
-            }
-        }
-        if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
-            for (ILogicalPlan p : nested.getNestedPlans()) {
-                setPhysicalOperators(p, false, context);
-            }
-        }
-        for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
+
+        @Override
+        public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
+            boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
+                    BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
+            boolean frameStartIsMonotonic = AnalysisUtil
+                    .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
+            boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
+                    winOp.getFrameValueExpressions());
+            boolean nestedTrivialAggregates = winOp.hasNestedPlans()
+                    && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+
+            return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization,
+                    winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
+                    context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
         }
     }
-
-    private static void generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
-            throws AlgebricksException {
-        if (gby.getNestedPlans().size() != 1) {
-            throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
-                    "External group-by currently works only for one nested plan with one root containing"
-                            + "an aggregate and a nested-tuple-source.");
-        }
-        ILogicalPlan p0 = gby.getNestedPlans().get(0);
-        if (p0.getRoots().size() != 1) {
-            throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
-                    "External group-by currently works only for one nested plan with one root containing"
-                            + "an aggregate and a nested-tuple-source.");
-        }
-        IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
-                context.getMergeAggregationExpressionFactory();
-        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
-        AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
-        if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            throw new CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
-                    "The merge aggregation expression generation should not process a " + r0Logical.getOperatorTag()
-                            + " operator.");
-        }
-        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
-        List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
-        List<LogicalVariable> aggProducedVars = aggOp.getVariables();
-        int n = aggOp.getExpressions().size();
-        List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
-        for (int i = 0; i < n; i++) {
-            ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
-            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
-                    .createMergeAggregation(aggProducedVars.get(i), aggFuncExpr, context);
-            if (mergeExpr == null) {
-                throw new CompilationException(ErrorCode.COMPILATION_ERROR, aggFuncExpr.getSourceLocation(),
-                        "The aggregation function "
-                                + ((AbstractFunctionCallExpression) aggFuncExpr).getFunctionIdentifier().getName()
-                                + " does not have a registered intermediate aggregation function.");
-            }
-            mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
-        }
-        aggOp.setMergeExpressions(mergeExpressionRefs);
-    }
-
-    private static WindowPOperator createWindowPOperator(WindowOperator winOp, IOptimizationContext context)
-            throws CompilationException {
-        List<Mutable<ILogicalExpression>> partitionExprs = winOp.getPartitionExpressions();
-        List<LogicalVariable> partitionColumns = new ArrayList<>(partitionExprs.size());
-        for (Mutable<ILogicalExpression> pe : partitionExprs) {
-            ILogicalExpression partExpr = pe.getValue();
-            if (partExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
-                        "Window partition/order expression has not been normalized");
-            }
-            LogicalVariable var = ((VariableReferenceExpression) partExpr).getVariableReference();
-            partitionColumns.add(var);
-        }
-        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprs = winOp.getOrderExpressions();
-        List<OrderColumn> orderColumns = new ArrayList<>(orderExprs.size());
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprs) {
-            ILogicalExpression orderExpr = p.second.getValue();
-            if (orderExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
-                        "Window partition/order expression has not been normalized");
-            }
-            LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference();
-            orderColumns.add(new OrderColumn(var, p.first.getKind()));
-        }
-
-        boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
-                BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
-        boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
-                winOp.getFrameValueExpressions());
-        boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
-                winOp.getFrameValueExpressions());
-        boolean nestedTrivialAggregates = winOp.hasNestedPlans()
-                && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
-        int memSizeInFrames = context.getPhysicalOptimizationConfig().getMaxFramesForWindow();
-
-        return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic,
-                frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames);
-    }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 0235dad..db2290c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -29,9 +29,12 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -375,4 +378,28 @@
     public boolean requiresVariableReferenceExpressions() {
         return false;
     }
+
+    public List<LogicalVariable> getPartitionVarList() {
+        List<LogicalVariable> varList = new ArrayList<>(partitionExpressions.size());
+        for (Mutable<ILogicalExpression> pe : partitionExpressions) {
+            ILogicalExpression partExpr = pe.getValue();
+            if (partExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                LogicalVariable var = ((VariableReferenceExpression) partExpr).getVariableReference();
+                varList.add(var);
+            }
+        }
+        return varList;
+    }
+
+    public List<OrderColumn> getOrderColumnList() {
+        List<OrderColumn> orderColumns = new ArrayList<>(orderExpressions.size());
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+            ILogicalExpression orderExpr = p.second.getValue();
+            if (orderExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference();
+                orderColumns.add(new OrderColumn(var, p.first.getKind()));
+            }
+        }
+        return orderColumns;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 1d5a7e9..49e5a0b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -18,10 +18,10 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules;
 
-import static org.apache.hyracks.api.exceptions.ErrorCode.ORDER_EXPR_NOT_NORMALIZED;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.function.Function;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -42,17 +43,41 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
@@ -88,10 +113,13 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 
 public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule {
 
@@ -105,385 +133,476 @@
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        // if (context.checkIfInDontApplySet(this, op)) {
-        // return false;
-        // }
         if (op.getPhysicalOperator() != null) {
             return false;
         }
-
-        computeDefaultPhysicalOp(op, true, context);
-        // context.addToDontApplySet(this, op);
+        computeDefaultPhysicalOp(op, true, createPhysicalOperatorFactoryVisitor(context));
         return true;
     }
 
-    private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context)
-            throws AlgebricksException {
-        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context);
-        }
-    }
-
     private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
-            IOptimizationContext context) throws AlgebricksException {
-        PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+            ILogicalOperatorVisitor<IPhysicalOperator, Boolean> physOpFactory) throws AlgebricksException {
         if (op.getPhysicalOperator() == null) {
-            switch (op.getOperatorTag()) {
-                case AGGREGATE: {
-                    op.setPhysicalOperator(new AggregatePOperator());
-                    break;
-                }
-                case ASSIGN: {
-                    op.setPhysicalOperator(new AssignPOperator());
-                    break;
-                }
-                case DISTINCT: {
-                    DistinctOperator distinct = (DistinctOperator) op;
-                    if (topLevelOp) {
-                        distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
-                    } else {
-                        distinct.setPhysicalOperator(
-                                new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
-                    }
-                    break;
-                }
-                case EMPTYTUPLESOURCE: {
-                    op.setPhysicalOperator(new EmptyTupleSourcePOperator());
-                    break;
-                }
-                case EXCHANGE: {
-                    if (op.getPhysicalOperator() == null) {
-                        throw new AlgebricksException("Implementation for EXCHANGE operator was not set.");
-                    }
-                    // implem. choice for exchange should be set by a parent op.
-                    break;
-                }
-                case GROUP: {
-                    GroupByOperator gby = (GroupByOperator) op;
-
-                    if (gby.getNestedPlans().size() == 1) {
-                        ILogicalPlan p0 = gby.getNestedPlans().get(0);
-                        if (p0.getRoots().size() == 1) {
-                            if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
-                                    || (gby.getAnnotations()
-                                            .get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
-                                if (!topLevelOp) {
-                                    throw new NotImplementedException(
-                                            "External hash group-by for nested grouping is not implemented.");
-                                }
-
-                                boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
-                                if (hasIntermediateAgg) {
-                                    ExternalGroupByPOperator externalGby =
-                                            new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                                                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                                                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                                                            * physicalOptimizationConfig.getFrameSize());
-                                    op.setPhysicalOperator(externalGby);
-                                    break;
-                                }
-                            }
-                        }
-                    }
-
-                    if (topLevelOp) {
-                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(gby.getGroupByVarList(),
-                                gby.isGroupAll(), context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
-                    } else {
-                        op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
-                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
-                    }
-                    break;
-                }
-                case INNERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context);
-                    break;
-                }
-                case LEFTOUTERJOIN: {
-                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context);
-                    break;
-                }
-                case LIMIT: {
-                    op.setPhysicalOperator(new StreamLimitPOperator());
-                    break;
-                }
-                case NESTEDTUPLESOURCE: {
-                    op.setPhysicalOperator(new NestedTupleSourcePOperator());
-                    break;
-                }
-                case ORDER: {
-                    OrderOperator oo = (OrderOperator) op;
-                    for (Pair<IOrder, Mutable<ILogicalExpression>> p : oo.getOrderExpressions()) {
-                        ILogicalExpression e = p.second.getValue();
-                        if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                            throw AlgebricksException.create(ORDER_EXPR_NOT_NORMALIZED, e.getSourceLocation());
-                        }
-                    }
-                    if (topLevelOp) {
-                        op.setPhysicalOperator(new StableSortPOperator(
-                                physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
-                    } else {
-                        op.setPhysicalOperator(new InMemoryStableSortPOperator());
-                    }
-                    break;
-                }
-                case PROJECT: {
-                    op.setPhysicalOperator(new StreamProjectPOperator());
-                    break;
-                }
-                case RUNNINGAGGREGATE: {
-                    op.setPhysicalOperator(new RunningAggregatePOperator());
-                    break;
-                }
-                case REPLICATE: {
-                    op.setPhysicalOperator(new ReplicatePOperator());
-                    break;
-                }
-                case SPLIT:
-                    op.setPhysicalOperator(new SplitPOperator());
-                    break;
-                case SCRIPT: {
-                    op.setPhysicalOperator(new StringStreamingScriptPOperator());
-                    break;
-                }
-                case SELECT: {
-                    op.setPhysicalOperator(new StreamSelectPOperator());
-                    break;
-                }
-                case SUBPLAN: {
-                    op.setPhysicalOperator(new SubplanPOperator());
-                    break;
-                }
-                case UNIONALL: {
-                    if (topLevelOp) {
-                        op.setPhysicalOperator(new UnionAllPOperator());
-                    } else {
-                        op.setPhysicalOperator(new MicroUnionAllPOperator());
-                    }
-                    break;
-                }
-                case INTERSECT: {
-                    if (topLevelOp) {
-                        op.setPhysicalOperator(new IntersectPOperator());
-                    } else {
-                        throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
-                    }
-                    break;
-                }
-                case UNNEST: {
-                    op.setPhysicalOperator(new UnnestPOperator());
-                    break;
-                }
-                case LEFT_OUTER_UNNEST:
-                    op.setPhysicalOperator(new LeftOuterUnnestPOperator());
-                    break;
-                case DATASOURCESCAN: {
-                    DataSourceScanOperator scan = (DataSourceScanOperator) op;
-                    IDataSource dataSource = scan.getDataSource();
-                    DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
-                    if (dataSource.isScanAccessPathALeaf()) {
-                        dss.disableJobGenBelowMe();
-                    }
-                    op.setPhysicalOperator(dss);
-                    break;
-                }
-                case WRITE: {
-                    op.setPhysicalOperator(new SinkWritePOperator());
-                    break;
-                }
-                case DISTRIBUTE_RESULT: {
-                    op.setPhysicalOperator(new DistributeResultPOperator());
-                    break;
-                }
-                case WRITE_RESULT: {
-                    WriteResultOperator opLoad = (WriteResultOperator) op;
-                    LogicalVariable payload;
-                    List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
-                    List<LogicalVariable> additionalFilteringKeys = null;
-                    payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
-                    if (opLoad.getAdditionalFilteringExpressions() != null) {
-                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
-                        getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
-                    }
-                    op.setPhysicalOperator(
-                            new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
-                    break;
-                }
-                case INSERT_DELETE_UPSERT: {
-                    // Primary index
-                    InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
-                    LogicalVariable payload;
-                    List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
-                    List<LogicalVariable> additionalFilteringKeys = null;
-                    List<LogicalVariable> additionalNonFilterVariables = null;
-                    if (opLoad.getAdditionalNonFilteringExpressions() != null) {
-                        additionalNonFilterVariables = new ArrayList<LogicalVariable>();
-                        getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
-                    }
-                    payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
-                    if (opLoad.getAdditionalFilteringExpressions() != null) {
-                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
-                        getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
-                    }
-                    if (opLoad.isBulkload()) {
-                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys,
-                                additionalNonFilterVariables, opLoad.getDataSource()));
-                    } else {
-                        op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
-                                opLoad.getDataSource(), opLoad.getOperation(), additionalNonFilterVariables));
-                    }
-                    break;
-                }
-                case INDEX_INSERT_DELETE_UPSERT: {
-                    // Secondary index
-                    IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
-                    List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
-                    List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
-                    List<LogicalVariable> additionalFilteringKeys = null;
-                    getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
-                    getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
-                    if (opInsDel.getAdditionalFilteringExpressions() != null) {
-                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
-                        getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
-                    }
-                    if (opInsDel.isBulkload()) {
-                        op.setPhysicalOperator(
-                                new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
-                                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
-                    } else {
-                        LogicalVariable upsertIndicatorVar = null;
-                        List<LogicalVariable> prevSecondaryKeys = null;
-                        LogicalVariable prevAdditionalFilteringKey = null;
-                        if (opInsDel.getOperation() == Kind.UPSERT) {
-                            upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
-                            prevSecondaryKeys = new ArrayList<LogicalVariable>();
-                            getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
-                            if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
-                                prevAdditionalFilteringKey =
-                                        ((VariableReferenceExpression) (opInsDel.getPrevAdditionalFilteringExpression())
-                                                .getValue()).getVariableReference();
-                            }
-                        }
-                        op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
-                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
-                                upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey,
-                                opInsDel.getNumberOfAdditionalNonFilteringFields()));
-                    }
-                    break;
-
-                }
-                case TOKENIZE: {
-                    TokenizeOperator opTokenize = (TokenizeOperator) op;
-                    List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
-                    List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
-                    getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
-                    getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
-                    // Tokenize Operator only operates with a bulk load on a data set with an index
-                    if (opTokenize.isBulkload()) {
-                        op.setPhysicalOperator(
-                                new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
-                    }
-                    break;
-                }
-                case SINK: {
-                    op.setPhysicalOperator(new SinkPOperator());
-                    break;
-                }
-                case FORWARD:
-                    op.setPhysicalOperator(new SortForwardPOperator());
-                    break;
+            IPhysicalOperator physOp = op.accept(physOpFactory, topLevelOp);
+            if (physOp == null) {
+                throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+                        op.getOperatorTag());
             }
+            op.setPhysicalOperator(physOp);
         }
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
-                setPhysicalOperators(p, false, context);
+                for (Mutable<ILogicalOperator> root : p.getRoots()) {
+                    computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), false, physOpFactory);
+                }
             }
         }
         for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
+            computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, physOpFactory);
         }
     }
 
-    private static void getKeys(List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
-        for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
-            keys.add(getKey(kExpr.getValue()));
-        }
+    protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
+            IOptimizationContext context) {
+        return new AlgebricksPhysicalOperatorFactoryVisitor(context);
     }
 
-    private static LogicalVariable getKey(ILogicalExpression keyExpression) {
-        if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-            throw new NotImplementedException();
-        }
-        return ((VariableReferenceExpression) keyExpression).getVariableReference();
-    }
+    protected static class AlgebricksPhysicalOperatorFactoryVisitor
+            implements ILogicalOperatorVisitor<IPhysicalOperator, Boolean> {
 
-    private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
-            List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
-        LogicalVariable payload;
-        if (payloadExpr.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-            throw new NotImplementedException();
-        }
-        payload = ((VariableReferenceExpression) payloadExpr.getValue()).getVariableReference();
+        protected final IOptimizationContext context;
 
-        for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
-            ILogicalExpression e = kExpr.getValue();
-            if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+        protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+
+        protected AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
+            this.context = context;
+            this.physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+        }
+
+        @Override
+        public IPhysicalOperator visitAggregateOperator(AggregateOperator op, Boolean topLevelOp) {
+            return new AggregatePOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) {
+            return new AssignPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitDistinctOperator(DistinctOperator distinct, Boolean topLevelOp) {
+            if (topLevelOp) {
+                return new PreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+            } else {
+                return new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean topLevelOp) {
+            return new EmptyTupleSourcePOperator();
+        }
+
+        @Override
+        public final IPhysicalOperator visitGroupByOperator(GroupByOperator gby, Boolean topLevelOp)
+                throws AlgebricksException {
+
+            ensureAllVariables(gby.getGroupByList(), Pair::getSecond);
+
+            if (gby.getNestedPlans().size() == 1 && gby.getNestedPlans().get(0).getRoots().size() == 1) {
+                if (topLevelOp && ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
+                        || (gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE))) {
+                    ExternalGroupByPOperator extGby = createExternalGroupByPOperator(gby);
+                    if (extGby != null) {
+                        return extGby;
+                    }
+                }
+            }
+
+            if (topLevelOp) {
+                return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
+                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+            } else {
+                return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
+                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+            }
+        }
+
+        protected ExternalGroupByPOperator createExternalGroupByPOperator(GroupByOperator gby)
+                throws AlgebricksException {
+            boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby);
+            if (!hasIntermediateAgg) {
+                return null;
+            }
+            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+                            * physicalOptimizationConfig.getFrameSize());
+        }
+
+        @Override
+        public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+            return op.getPhysicalOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+            return op.getPhysicalOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitLimitOperator(LimitOperator op, Boolean topLevelOp) {
+            return new StreamLimitPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean topLevelOp) {
+            return new NestedTupleSourcePOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean topLevelOp) throws AlgebricksException {
+            ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
+            if (topLevelOp) {
+                return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK());
+            } else {
+                return new InMemoryStableSortPOperator();
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitProjectOperator(ProjectOperator op, Boolean topLevelOp) {
+            return new StreamProjectPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Boolean topLevelOp) {
+            return new RunningAggregatePOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitReplicateOperator(ReplicateOperator op, Boolean topLevelOp) {
+            return new ReplicatePOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitSplitOperator(SplitOperator op, Boolean topLevelOp) {
+            return new SplitPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitScriptOperator(ScriptOperator op, Boolean topLevelOp) {
+            return new StringStreamingScriptPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitSelectOperator(SelectOperator op, Boolean topLevelOp) {
+            return new StreamSelectPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitSubplanOperator(SubplanOperator op, Boolean topLevelOp) {
+            return new SubplanPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitUnionOperator(UnionAllOperator op, Boolean topLevelOp) {
+            if (topLevelOp) {
+                return new UnionAllPOperator();
+            } else {
+                return new MicroUnionAllPOperator();
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitIntersectOperator(IntersectOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            if (topLevelOp) {
+                return new IntersectPOperator();
+            } else {
+                throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+                        op.getOperatorTag().toString() + " (micro)");
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitUnnestOperator(UnnestOperator op, Boolean topLevelOp) {
+            return new UnnestPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean topLevelOp) {
+            return new LeftOuterUnnestPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitDataScanOperator(DataSourceScanOperator scan, Boolean topLevelOp) {
+            IDataSource dataSource = scan.getDataSource();
+            DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
+            if (dataSource.isScanAccessPathALeaf()) {
+                dss.disableJobGenBelowMe();
+            }
+            return dss;
+        }
+
+        @Override
+        public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) {
+            return new SinkWritePOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitDistributeResultOperator(DistributeResultOperator op, Boolean topLevelOp) {
+            return new DistributeResultPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitWriteResultOperator(WriteResultOperator opLoad, Boolean topLevelOp) {
+            List<LogicalVariable> keys = new ArrayList<>();
+            List<LogicalVariable> additionalFilteringKeys = null;
+            LogicalVariable payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
+            if (opLoad.getAdditionalFilteringExpressions() != null) {
+                additionalFilteringKeys = new ArrayList<>();
+                getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+            }
+            return new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys);
+        }
+
+        @Override
+        public IPhysicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator opLoad,
+                Boolean topLevelOp) {
+            // Primary index
+            List<LogicalVariable> keys = new ArrayList<>();
+            List<LogicalVariable> additionalFilteringKeys = null;
+            List<LogicalVariable> additionalNonFilterVariables = null;
+            if (opLoad.getAdditionalNonFilteringExpressions() != null) {
+                additionalNonFilterVariables = new ArrayList<>();
+                getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
+            }
+            LogicalVariable payload =
+                    getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
+            if (opLoad.getAdditionalFilteringExpressions() != null) {
+                additionalFilteringKeys = new ArrayList<>();
+                getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+            }
+            if (opLoad.isBulkload()) {
+                return new BulkloadPOperator(payload, keys, additionalFilteringKeys, additionalNonFilterVariables,
+                        opLoad.getDataSource());
+            } else {
+                return new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource(),
+                        opLoad.getOperation(), additionalNonFilterVariables);
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator opInsDel,
+                Boolean topLevelOp) {
+            // Secondary index
+            List<LogicalVariable> primaryKeys = new ArrayList<>();
+            List<LogicalVariable> secondaryKeys = new ArrayList<>();
+            List<LogicalVariable> additionalFilteringKeys = null;
+            getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
+            getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
+            if (opInsDel.getAdditionalFilteringExpressions() != null) {
+                additionalFilteringKeys = new ArrayList<>();
+                getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
+            }
+            if (opInsDel.isBulkload()) {
+                return new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex());
+            } else {
+                LogicalVariable upsertIndicatorVar = null;
+                List<LogicalVariable> prevSecondaryKeys = null;
+                LogicalVariable prevAdditionalFilteringKey = null;
+                if (opInsDel.getOperation() == Kind.UPSERT) {
+                    upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+                    prevSecondaryKeys = new ArrayList<>();
+                    getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+                    if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+                        prevAdditionalFilteringKey =
+                                ((VariableReferenceExpression) (opInsDel.getPrevAdditionalFilteringExpression())
+                                        .getValue()).getVariableReference();
+                    }
+                }
+                return new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), upsertIndicatorVar,
+                        prevSecondaryKeys, prevAdditionalFilteringKey,
+                        opInsDel.getNumberOfAdditionalNonFilteringFields());
+            }
+        }
+
+        @Override
+        public IPhysicalOperator visitTokenizeOperator(TokenizeOperator opTokenize, Boolean topLevelOp)
+                throws AlgebricksException {
+            List<LogicalVariable> primaryKeys = new ArrayList<>();
+            List<LogicalVariable> secondaryKeys = new ArrayList<>();
+            getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
+            getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
+            // Tokenize Operator only operates with a bulk load on a data set with an index
+            if (!opTokenize.isBulkload()) {
+                throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, opTokenize.getSourceLocation(),
+                        opTokenize.getOperatorTag().toString() + " (no bulkload)");
+            }
+            return new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex());
+        }
+
+        @Override
+        public IPhysicalOperator visitSinkOperator(SinkOperator op, Boolean topLevelOp) {
+            return new SinkPOperator();
+        }
+
+        @Override
+        public IPhysicalOperator visitForwardOperator(ForwardOperator op, Boolean topLevelOp) {
+            return new SortForwardPOperator();
+        }
+
+        @Override
+        public final IPhysicalOperator visitWindowOperator(WindowOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            ensureAllVariables(op.getPartitionExpressions(), v -> v);
+            ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+            return createWindowPOperator(op);
+        }
+
+        protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
+            return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false,
+                    context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+        }
+
+        // Physical operators for these operators must have been set already by rules that introduced them
+
+        @Override
+        public IPhysicalOperator visitDelegateOperator(DelegateOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+                    op.getOperatorTag());
+        }
+
+        @Override
+        public IPhysicalOperator visitExchangeOperator(ExchangeOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+                    op.getOperatorTag());
+        }
+
+        @Override
+        public IPhysicalOperator visitMaterializeOperator(MaterializeOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET, op.getSourceLocation(),
+                    op.getOperatorTag());
+        }
+
+        // Physical operators for these operators cannot be instantiated by Algebricks
+
+        @Override
+        public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+                    op.getOperatorTag());
+        }
+
+        @Override
+        public IPhysicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+                    op.getOperatorTag());
+        }
+
+        // Helper methods
+
+        private static void getKeys(List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
+            for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+                keys.add(getKey(kExpr.getValue()));
+            }
+        }
+
+        private static LogicalVariable getKey(ILogicalExpression keyExpression) {
+            if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
                 throw new NotImplementedException();
             }
-            keys.add(((VariableReferenceExpression) e).getVariableReference());
-        }
-        return payload;
-    }
-
-    private static boolean generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
-            throws AlgebricksException {
-        if (gby.getNestedPlans().size() != 1) {
-            //External/Sort group-by currently works only for one nested plan with one root containing
-            //an aggregate and a nested-tuple-source.
-            throw new AlgebricksException(
-                    "External group-by currently works only for one nested plan with one root containing"
-                            + "an aggregate and a nested-tuple-source.");
-        }
-        ILogicalPlan p0 = gby.getNestedPlans().get(0);
-        if (p0.getRoots().size() != 1) {
-            //External/Sort group-by currently works only for one nested plan with one root containing
-            //an aggregate and a nested-tuple-source.
-            throw new AlgebricksException(
-                    "External group-by currently works only for one nested plan with one root containing"
-                            + "an aggregate and a nested-tuple-source.");
-        }
-        IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
-                context.getMergeAggregationExpressionFactory();
-        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
-        AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
-        if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            return false;
+            return ((VariableReferenceExpression) keyExpression).getVariableReference();
         }
 
-        // Check whether there are multiple aggregates in the sub plan.
-        ILogicalOperator r1Logical = r0Logical;
-        while (r1Logical.hasInputs()) {
-            r1Logical = r1Logical.getInputs().get(0).getValue();
-            if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+        private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
+                List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
+            LogicalVariable payload;
+            if (payloadExpr.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new NotImplementedException();
+            }
+            payload = ((VariableReferenceExpression) payloadExpr.getValue()).getVariableReference();
+
+            for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+                ILogicalExpression e = kExpr.getValue();
+                if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    throw new NotImplementedException();
+                }
+                keys.add(((VariableReferenceExpression) e).getVariableReference());
+            }
+            return payload;
+        }
+
+        private boolean generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
+            if (gby.getNestedPlans().size() != 1) {
+                //External/Sort group-by currently works only for one nested plan with one root containing
+                //an aggregate and a nested-tuple-source.
+                throw new AlgebricksException(
+                        "External group-by currently works only for one nested plan with one root containing"
+                                + "an aggregate and a nested-tuple-source.");
+            }
+            ILogicalPlan p0 = gby.getNestedPlans().get(0);
+            if (p0.getRoots().size() != 1) {
+                //External/Sort group-by currently works only for one nested plan with one root containing
+                //an aggregate and a nested-tuple-source.
+                throw new AlgebricksException(
+                        "External group-by currently works only for one nested plan with one root containing"
+                                + "an aggregate and a nested-tuple-source.");
+            }
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
+                    context.getMergeAggregationExpressionFactory();
+            Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+            AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+            if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
                 return false;
             }
+
+            // Check whether there are multiple aggregates in the sub plan.
+            ILogicalOperator r1Logical = r0Logical;
+            while (r1Logical.hasInputs()) {
+                r1Logical = r1Logical.getInputs().get(0).getValue();
+                if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    return false;
+                }
+            }
+
+            AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+            List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+            List<LogicalVariable> originalAggVars = aggOp.getVariables();
+            int n = aggOp.getExpressions().size();
+            List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<>();
+            for (int i = 0; i < n; i++) {
+                ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+                        .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+                if (mergeExpr == null) {
+                    return false;
+                }
+                mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+            }
+            aggOp.setMergeExpressions(mergeExpressionRefs);
+            return true;
         }
 
-        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
-        List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
-        List<LogicalVariable> originalAggVars = aggOp.getVariables();
-        int n = aggOp.getExpressions().size();
-        List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
-        for (int i = 0; i < n; i++) {
-            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
-                    .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
-            if (mergeExpr == null) {
-                return false;
+        static <E> void ensureAllVariables(Collection<E> exprList, Function<E, Mutable<ILogicalExpression>> accessor)
+                throws AlgebricksException {
+            for (E item : exprList) {
+                ILogicalExpression e = accessor.apply(item).getValue();
+                if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    throw AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED, e.getSourceLocation());
+                }
             }
-            mergeExpressionRefs.add(new MutableObject<ILogicalExpression>(mergeExpr));
         }
-        aggOp.setMergeExpressions(mergeExpressionRefs);
-        return true;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index bf34664..a31aef2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -161,7 +161,8 @@
     public static final int CANNOT_COMPOSE_PART_CONSTRAINTS = 10001;
     public static final int PHYS_OPERATOR_NOT_SET = 10002;
     public static final int DESCRIPTOR_GENERATION_ERROR = 10003;
-    public static final int ORDER_EXPR_NOT_NORMALIZED = 10004;
+    public static final int EXPR_NOT_NORMALIZED = 10004;
+    public static final int OPERATOR_NOT_IMPLEMENTED = 10005;
 
     private static class Holder {
         private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b4f7973..8e3b85e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,4 +143,5 @@
 10001 = Cannot compose partition constraint %1$s with %2$s
 10002 = Physical operator not set for operator: %1$s
 10003 = Could not generate operator descriptor for operator %1$s
-10004 = Order expression has not been normalized
+10004 = Expression has not been normalized
+10005 = Operator is not implemented: %1$s
