diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b26eaca..69eecfd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -56,8 +56,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
@@ -241,19 +243,24 @@
         }
 
         @Override
-        public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
-            boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
-                    BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
-            boolean frameStartIsMonotonic = AnalysisUtil
-                    .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
-            boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
-                    winOp.getFrameValueExpressions());
-            boolean nestedTrivialAggregates = winOp.hasNestedPlans()
-                    && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
-            return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization,
-                    winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
-                    context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+        public AbstractWindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
+            if (winOp.hasNestedPlans()) {
+                boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+                        winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
+                boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+                        winOp.getFrameEndExpressions(), winOp.getFrameValueExpressions());
+                boolean nestedTrivialAggregates =
+                        winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+                return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(),
+                        frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
+                        context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
+                    BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
+                return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false,
+                        context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            } else {
+                return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 6cba1b1..a2c1c33 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -25,7 +25,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 
 public class OperatorResourcesComputer {
 
@@ -146,10 +145,10 @@
     }
 
     private long getWindowRequiredMemory(WindowOperator op) {
-        WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
         // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
         // streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
-        long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
+        long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
+                : windowMemorySize;
         return getOperatorRequiredMemory(op, memorySize);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index c0fca94..024a13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -318,7 +318,7 @@
         WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
         visitInternal(op, true);
         addOutputBuffer(op); // + previous frame
-        if (physOp.isPartitionMaterialization()) {
+        if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) {
             addOutputBuffer(op); // + run frame
         }
         return null;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
index e452d03..c12faf5 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
@@ -11,7 +11,7 @@
                           -- AGGREGATE  |LOCAL|
                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                         }
-                  -- WINDOW  |PARTITIONED|
+                  -- WINDOW_STREAM  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- STABLE_SORT [$$34(ASC), $$48(ASC)]  |PARTITIONED|
                         -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
index ab78ecc..a1e04ad 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |LOCAL|
       -- ASSIGN  |LOCAL|
         -- WINDOW  |LOCAL|
-          -- WINDOW  |LOCAL|
+          -- WINDOW_STREAM  |LOCAL|
             -- ONE_TO_ONE_EXCHANGE  |LOCAL|
               -- STABLE_SORT [$$m(ASC), $$t(ASC)]  |LOCAL|
                 -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
index 5b3d480..b111336 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
@@ -8,7 +8,7 @@
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- WINDOW  |LOCAL|
+            -- WINDOW_STREAM  |LOCAL|
               -- ONE_TO_ONE_EXCHANGE  |LOCAL|
                 -- STABLE_SORT [$$m(ASC), $$t(ASC)]  |LOCAL|
                   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 8e1f77f..84d19c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -78,5 +78,6 @@
     UPDATE,
     WRITE_RESULT,
     INTERSECT,
-    WINDOW
+    WINDOW,
+    WINDOW_STREAM
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
new file mode 100644
index 0000000..7065b70
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> partitionColumns;
+
+    private final List<OrderColumn> orderColumns;
+
+    AbstractWindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
+        this.partitionColumns = partitionColumns;
+        this.orderColumns = orderColumns;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
+        IPartitioningProperty pp;
+        switch (op.getExecutionMode()) {
+            case PARTITIONED:
+                pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+                        context.getComputationNodeDomain());
+                break;
+            case UNPARTITIONED:
+                pp = IPartitioningProperty.UNPARTITIONED;
+                break;
+            case LOCAL:
+                pp = null;
+                break;
+            default:
+                throw new IllegalStateException(op.getExecutionMode().name());
+        }
+
+        // require local order property [pc1, ... pcN, oc1, ... ocN]
+        // accounting for cases where there's an overlap between order and partition columns
+        // TODO replace with required local grouping on partition columns + local order on order columns
+        List<OrderColumn> lopColumns = new ArrayList<>();
+        ListSet<LogicalVariable> pcVars = new ListSet<>();
+        pcVars.addAll(partitionColumns);
+        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+            OrderColumn oc = orderColumns.get(oIdx);
+            LogicalVariable ocVar = oc.getColumn();
+            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+                throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+                        op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
+            }
+            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+        }
+        int pIdx = 0;
+        for (LogicalVariable pColumn : pcVars) {
+            lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+        }
+        List<ILocalStructuralProperty> localProps =
+                lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
+
+        return new PhysicalRequirements(
+                new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        WindowOperator winOp = (WindowOperator) op;
+
+        int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+        IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
+        IBinaryComparatorFactory[] partitionComparatorFactories =
+                JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context);
+
+        //TODO not all functions need order comparators
+        IBinaryComparatorFactory[] orderComparatorFactories =
+                JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context);
+
+        IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+        IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
+        IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
+
+        List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
+        IScalarEvaluatorFactory[] frameStartExprEvals =
+                createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
+        IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
+                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
+        IScalarEvaluatorFactory[] frameEndExprEvals =
+                createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
+        IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
+                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
+                winOp.getFrameValueExpressions();
+        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
+                createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
+                        inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
+        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
+                createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
+                        inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+
+        IScalarEvaluatorFactory frameOffsetExprEval = null;
+        ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
+        if (frameOffsetExpr != null) {
+            frameOffsetExprEval =
+                    exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context);
+        }
+
+        int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema);
+
+        int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
+
+        List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions();
+        int runningAggExprCount = runningAggExprs.size();
+        IRunningAggregateEvaluatorFactory[] runningAggFactories =
+                new IRunningAggregateEvaluatorFactory[runningAggExprCount];
+        for (int i = 0; i < runningAggExprCount; i++) {
+            StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
+            runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
+                    inputSchemas, context);
+        }
+
+        int nestedAggOutSchemaSize = 0;
+        WindowAggregatorDescriptorFactory nestedAggFactory = null;
+        if (winOp.hasNestedPlans()) {
+            int opSchemaSizePreSubplans = opSchema.getSize();
+            AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
+            nestedAggOutSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
+            nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
+            nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
+        }
+
+        AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp, partitionColumnsList,
+                partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first,
+                frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
+                frameEndExprEvals, frameEndValidationExprEvals, frameExcludeExprEvalsAndComparators.first,
+                frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, projectionColumnsExcludingSubplans,
+                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, context);
+        runtime.setSourceLocation(winOp.getSourceLocation());
+
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context);
+        builder.contributeMicroOperator(winOp, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = winOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, winOp, 0);
+    }
+
+    protected abstract AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp,
+            int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context);
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+
+    private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
+            IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
+        if (exprList.isEmpty()) {
+            return null;
+        }
+        int ln = exprList.size();
+        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+        for (int i = 0; i < ln; i++) {
+            ILogicalExpression expr = exprList.get(i).getValue();
+            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
+        }
+        return evals;
+    }
+
+    private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories(
+            List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter,
+            Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider,
+            IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context)
+            throws AlgebricksException {
+        if (exprList.isEmpty()) {
+            return new Pair<>(null, null);
+        }
+        int ln = exprList.size();
+        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+        IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln];
+        for (int i = 0; i < ln; i++) {
+            T exprObj = exprList.get(i);
+            ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
+            OrderOperator.IOrder order = orderGetter.apply(exprObj);
+            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
+            comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
+                    order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
+        }
+        return new Pair<>(evals, comparators);
+    }
+
+    private static boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
+        for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+            if (varSet.contains(ocList.get(i).getColumn())) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 8bd4610..23853e8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -19,42 +19,13 @@
 
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
 
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -63,18 +34,9 @@
 import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 
-public class WindowPOperator extends AbstractPhysicalOperator {
-
-    private final List<LogicalVariable> partitionColumns;
-
-    private final boolean partitionMaterialization;
-
-    private final List<OrderColumn> orderColumns;
+public final class WindowPOperator extends AbstractWindowPOperator {
 
     private final boolean frameStartIsMonotonic;
 
@@ -85,12 +47,10 @@
     // The maximum number of in-memory frames that this operator can use.
     private final int memSizeInFrames;
 
-    public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
-            List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
-            boolean nestedTrivialAggregates, int memSizeInFrames) {
-        this.partitionColumns = partitionColumns;
-        this.partitionMaterialization = partitionMaterialization;
-        this.orderColumns = orderColumns;
+    public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns,
+            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates,
+            int memSizeInFrames) {
+        super(partitionColumns, orderColumns);
         this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndIsMonotonic = frameEndIsMonotonic;
         this.nestedTrivialAggregates = nestedTrivialAggregates;
@@ -103,245 +63,55 @@
     }
 
     @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
-        IPartitioningProperty pp;
-        switch (op.getExecutionMode()) {
-            case PARTITIONED:
-                pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
-                        context.getComputationNodeDomain());
-                break;
-            case UNPARTITIONED:
-                pp = IPartitioningProperty.UNPARTITIONED;
-                break;
-            case LOCAL:
-                pp = null;
-                break;
-            default:
-                throw new IllegalStateException(op.getExecutionMode().name());
-        }
+    protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
 
-        // require local order property [pc1, ... pcN, oc1, ... ocN]
-        // accounting for cases where there's an overlap between order and partition columns
-        // TODO replace with required local grouping on partition columns + local order on order columns
-        List<OrderColumn> lopColumns = new ArrayList<>();
-        ListSet<LogicalVariable> pcVars = new ListSet<>();
-        pcVars.addAll(partitionColumns);
-        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
-            OrderColumn oc = orderColumns.get(oIdx);
-            LogicalVariable ocVar = oc.getColumn();
-            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
-                throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
-                        op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
-            }
-            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
-        }
-        int pIdx = 0;
-        for (LogicalVariable pColumn : pcVars) {
-            lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
-        }
-        List<ILocalStructuralProperty> localProps =
-                lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
-
-        return new PhysicalRequirements(
-                new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
-                IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        WindowOperator winOp = (WindowOperator) op;
-
-        int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
-
-        IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
-        IBinaryComparatorFactory[] partitionComparatorFactories =
-                JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context);
-
-        //TODO not all functions need order comparators
-        IBinaryComparatorFactory[] orderComparatorFactories =
-                JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context);
-
-        IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
-        IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
-        IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
-
-        List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
-        IScalarEvaluatorFactory[] frameStartExprEvals =
-                createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
-        IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
-                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
-        IScalarEvaluatorFactory[] frameEndExprEvals =
-                createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
-        IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
-                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
-                winOp.getFrameValueExpressions();
-        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
-                createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
-                        inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
-        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
-                createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
-                        inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
-        IScalarEvaluatorFactory frameOffsetExprEval = null;
-        ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
-        if (frameOffsetExpr != null) {
-            frameOffsetExprEval =
-                    exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context);
-        }
-
-        int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema);
-
-        int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
-
-        List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions();
-        int runningAggExprCount = runningAggExprs.size();
-        IRunningAggregateEvaluatorFactory[] runningAggFactories =
-                new IRunningAggregateEvaluatorFactory[runningAggExprCount];
-        for (int i = 0; i < runningAggExprCount; i++) {
-            StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
-            runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
-                    inputSchemas, context);
-        }
-
-        AbstractWindowRuntimeFactory runtime = null;
-        if (winOp.hasNestedPlans()) {
-            int opSchemaSizePreSubplans = opSchema.getSize();
-            AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
-            int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
-            WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
-            nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
-            int frameMaxObjects = winOp.getFrameMaxObjects();
-
-            // special cases
-            if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) {
-                if (frameEndExprList.isEmpty()) {
-                    // special case #1: frame == whole partition, no exclusions, no offset
-                    runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
-                            partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
-                            projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
-                            aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
-                } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
-                    // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
-                    //                  trivial aggregate subplan ( aggregate + nts )
-                    nestedAggFactory.setPartialOutputEnabled(true);
-                    runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
-                            partitionComparatorFactories, orderComparatorFactories,
-                            frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
-                            frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects,
-                            context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
-                            runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
-                            memSizeInFrames);
-                }
-            }
-            // default case
-            if (runtime == null) {
-                runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
-                        orderComparatorFactories, frameValueExprEvalsAndComparators.first,
-                        frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
-                        frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
-                        frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
-                        frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects,
-                        context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(),
-                        projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
-                        aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
-            }
-        } else if (partitionMaterialization) {
-            runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+        // special cases
+        if (!winOp.hasNestedPlans()) {
+            return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
                     orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
                     runningAggFactories, memSizeInFrames);
-        } else {
-            runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
-                    orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
-                    runningAggFactories);
         }
-        runtime.setSourceLocation(winOp.getSourceLocation());
 
-        // contribute one Asterix framewriter
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context);
-        builder.contributeMicroOperator(winOp, runtime, recDesc);
-        // and contribute one edge from its child
-        ILogicalOperator src = winOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, winOp, 0);
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return true;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
-
-    public boolean isPartitionMaterialization() {
-        return partitionMaterialization;
-    }
-
-    private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
-            IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
-        if (exprList.isEmpty()) {
-            return null;
-        }
-        int ln = exprList.size();
-        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
-        for (int i = 0; i < ln; i++) {
-            ILogicalExpression expr = exprList.get(i).getValue();
-            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
-        }
-        return evals;
-    }
-
-    private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories(
-            List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter,
-            Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider,
-            IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context)
-            throws AlgebricksException {
-        if (exprList.isEmpty()) {
-            return new Pair<>(null, null);
-        }
-        int ln = exprList.size();
-        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
-        IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln];
-        for (int i = 0; i < ln; i++) {
-            T exprObj = exprList.get(i);
-            ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
-            OrderOperator.IOrder order = orderGetter.apply(exprObj);
-            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
-            comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
-                    order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
-        }
-        return new Pair<>(evals, comparators);
-    }
-
-    private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
-        for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
-            if (varSet.contains(ocList.get(i).getColumn())) {
-                return true;
+        boolean hasFrameStart = frameStartExprEvals != null && frameStartExprEvals.length > 0;
+        boolean hasFrameEnd = frameEndExprEvals != null && frameEndExprEvals.length > 0;
+        boolean hasFrameExclude = frameExcludeExprEvals != null && frameExcludeExprEvals.length > 0;
+        boolean hasFrameOffset = frameOffsetExprEval != null;
+        if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) {
+            if (!hasFrameEnd) {
+                // special case #1: frame == whole partition, no exclusions, no offset
+                return new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+                        orderComparatorFactories, winOp.getFrameMaxObjects(), projectionColumnsExcludingSubplans,
+                        runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+                        memSizeInFrames);
+            } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+                // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
+                //                  trivial aggregate subplan ( aggregate + nts )
+                nestedAggFactory.setPartialOutputEnabled(true);
+                return new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+                        orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameEndExprEvals,
+                        frameEndValidationExprEvals, winOp.getFrameMaxObjects(),
+                        context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
+                        runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+                        memSizeInFrames);
             }
         }
-        return false;
+
+        // default case
+        return new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+                orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameStartExprEvals,
+                frameStartValidationExprEvals, frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
+                frameExcludeExprEvals, winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories,
+                frameOffsetExprEval, winOp.getFrameMaxObjects(), context.getBinaryBooleanInspectorFactory(),
+                context.getBinaryIntegerInspectorFactory(), projectionColumnsExcludingSubplans, runningAggOutColumns,
+                runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, memSizeInFrames);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
new file mode 100644
index 0000000..33b47ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public final class WindowStreamPOperator extends AbstractWindowPOperator {
+
+    public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
+        super(partitionColumns, orderColumns);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.WINDOW_STREAM;
+    }
+
+    @Override
+    protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
+        return new WindowStreamRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+                orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
+                runningAggFactories);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f127898..e6cdc28 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -79,6 +79,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
@@ -468,8 +469,8 @@
             return createWindowPOperator(op);
         }
 
-        protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
-            return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false,
+        protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
+            return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false,
                     context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
similarity index 94%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
index f7f1a25..d23d4e7 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
@@ -30,9 +30,9 @@
 /**
  * Runtime for window operators that evaluates running aggregates without partition materialization.
  */
-class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+class WindowStreamPushRuntime extends AbstractWindowPushRuntime {
 
-    WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+    WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
             SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
index 2d1cdde..be368a9 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
@@ -27,13 +27,14 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
 /**
- * Runtime factory for window operators that evaluates running aggregates without partition materialization.
+ * Runtime factory for window operators that evaluates running aggregates in a streaming fashion
+ * (without partition materialization).
  */
-public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public WindowSimpleRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+    public WindowStreamRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
@@ -42,13 +43,13 @@
 
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
-        return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+        return new WindowStreamPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
                 projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
     }
 
     @Override
     public String toString() {
-        return "window (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + " := "
-                + Arrays.toString(runningAggFactories);
+        return "window-stream (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns)
+                + " := " + Arrays.toString(runningAggFactories);
     }
 }
