[NO ISSUE][RT] Window operator runtime optimization

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
- Improve memory management for window operators
- Add "compiler.windowmemory" property that specifies memory
  budget for each window operator (default is 4MB, min is 160KB)
- Consolidated negative window operator testcases into a single one

Change-Id: I6756e92046883f79db339ef490cca8bc8b7b1fb8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3227
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index a1f819d..efffda2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -20,7 +20,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
 import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator;
@@ -68,7 +67,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
@@ -285,7 +283,7 @@
                 }
                 case WINDOW: {
                     WindowOperator winOp = (WindowOperator) op;
-                    WindowPOperator physOp = createWindowPOperator(winOp);
+                    WindowPOperator physOp = createWindowPOperator(winOp, context);
                     op.setPhysicalOperator(physOp);
                     break;
                 }
@@ -344,7 +342,8 @@
         aggOp.setMergeExpressions(mergeExpressionRefs);
     }
 
-    private static WindowPOperator createWindowPOperator(WindowOperator winOp) throws CompilationException {
+    private static WindowPOperator createWindowPOperator(WindowOperator winOp, IOptimizationContext context)
+            throws CompilationException {
         List<Mutable<ILogicalExpression>> partitionExprs = winOp.getPartitionExpressions();
         List<LogicalVariable> partitionColumns = new ArrayList<>(partitionExprs.size());
         for (Mutable<ILogicalExpression> pe : partitionExprs) {
@@ -377,7 +376,9 @@
         boolean nestedTrivialAggregates = winOp.hasNestedPlans()
                 && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
 
+        int memSizeInFrames = context.getPhysicalOptimizationConfig().getMaxFramesForWindow();
+
         return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic,
-                frameEndIsMonotonic, nestedTrivialAggregates);
+                frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 0bdc987..d155756 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -130,14 +130,14 @@
     public static final String PREFIX_INTERNAL_PARAMETERS = "_internal";
     private static final Set<String> CONFIGURABLE_PARAMETER_NAMES =
             ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                    CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
-                    CompilerProperties.COMPILER_PARALLELISM_KEY, CompilerProperties.COMPILER_SORT_PARALLEL_KEY,
-                    CompilerProperties.COMPILER_SORT_SAMPLES_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
-                    FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
-                    StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
-                    FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
-                    SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
-                    AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION);
+                    CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+                    CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
+                    CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
+                    FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+                    FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
+                    FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS,
+                    SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
+                    "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION);
 
     private final IRewriterFactory rewriterFactory;
     private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 2942a95..3fffdac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -36,15 +36,17 @@
     private final long groupByMemorySize;
     private final long joinMemorySize;
     private final long sortMemorySize;
+    private final long windowMemorySize;
     private final long textSearchMemorySize;
     private final long frameSize;
 
     public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
-            int joinFrameLimit, int textSearchFrameLimit, long frameSize) {
+            int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, long frameSize) {
         this.numComputationPartitions = numComputationPartitions;
         this.groupByMemorySize = groupFrameLimit * frameSize;
         this.joinMemorySize = joinFrameLimit * frameSize;
         this.sortMemorySize = sortFrameLimit * frameSize;
+        this.windowMemorySize = windowFrameLimit * frameSize;
         this.textSearchMemorySize = textSearchFrameLimit * frameSize;
         this.frameSize = frameSize;
     }
@@ -145,13 +147,9 @@
 
     private long getWindowRequiredMemory(WindowOperator op) {
         WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
-        int frameCount = 2;
-        if (physOp.isPartitionMaterialization()) {
-            frameCount++;
-        }
-        if (op.hasNestedPlans()) {
-            frameCount += 2;
-        }
-        return getOperatorRequiredMemory(op, frameSize * frameCount);
+        // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
+        // streaming window operators only need 2 frames: output + copy
+        long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
+        return getOperatorRequiredMemory(op, memorySize);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index d9ead33..0f4c4c0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -58,10 +58,11 @@
         final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
         final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
         final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
+        final int windowFrameLimit = physicalOptimizationConfig.getMaxFramesForWindow();
         final int textSearchFrameLimit = physicalOptimizationConfig.getMaxFramesForTextSearch();
         final List<PlanStage> planStages = getStages(plan);
         return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize);
+                groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
     }
 
     public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -73,9 +74,10 @@
     }
 
     public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
-            int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int textSearchFrameLimit, int frameSize) {
+            int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit,
+            int frameSize) {
         final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize);
+                groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
         final IClusterCapacity clusterCapacity = new ClusterCapacity();
         final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
                 .orElseThrow(IllegalStateException::new);
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index 8877be8..c9c7bb9 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -53,6 +53,7 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 compiler.sort.parallel=false
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc2.conf b/asterixdb/asterix-app/src/main/resources/cc2.conf
index 65dbafc..46f7168 100644
--- a/asterixdb/asterix-app/src/main/resources/cc2.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc2.conf
@@ -53,6 +53,7 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 compiler.parallelism=-1
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc3.conf b/asterixdb/asterix-app/src/main/resources/cc3.conf
index 20aa70d..0b26ef3 100644
--- a/asterixdb/asterix-app/src/main/resources/cc3.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc3.conf
@@ -53,6 +53,7 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 compiler.parallelism=3
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc4.conf b/asterixdb/asterix-app/src/main/resources/cc4.conf
index 5bdf8ea..6a66d25 100644
--- a/asterixdb/asterix-app/src/main/resources/cc4.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc4.conf
@@ -50,6 +50,7 @@
 compiler.sortmemory=320KB
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
 compiler.parallelism=-1
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index d3113ca..094009e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -301,7 +301,7 @@
 
     private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
         final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
-                FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+                FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
         Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/cc-compression.conf b/asterixdb/asterix-app/src/test/resources/cc-compression.conf
index 904707a..e58d691 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-compression.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-compression.conf
@@ -53,5 +53,6 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
index 9c64ab4..4d2087b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
@@ -51,5 +51,6 @@
 compiler.sortmemory=320KB
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index 811a40d..8995e19 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -49,6 +49,7 @@
 compiler.sortmemory=320KB
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
 txn.log.partitionsize=2MB
diff --git a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
index ea00513..db04d2b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
@@ -64,6 +64,7 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
 ssl.enabled=true
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/cc-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
index b6bed24..cf748c6 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
@@ -51,6 +51,7 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
 txn.log.checkpoint.pollfrequency=10
diff --git a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
index ef38fc2..1d05c28 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
@@ -53,6 +53,7 @@
 compiler.sortmemory=320KB
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
+compiler.windowmemory=192KB
 compiler.stringoffset=1
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf
index 2694408..adb71a5 100644
--- a/asterixdb/asterix-app/src/test/resources/cc.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc.conf
@@ -53,5 +53,6 @@
 compiler.groupmemory=160KB
 compiler.joinmemory=256KB
 compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
index 5655f18..e58ba11 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
@@ -21,6 +21,8 @@
  * Expected Res : SUCCESS
  */
 
+set `compiler.windowmemory` "256KB";
+
 use test;
 
 q1_ntile(10, 1000, 4)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
index 3120eb3..d477713 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
@@ -21,6 +21,8 @@
  * Expected Res : SUCCESS
  */
 
+set `compiler.windowmemory` "192KB";
+
 use test;
 
 q1_percent_rank(10, 1000, 3)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
index 8f38718..8b92104 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
@@ -21,6 +21,8 @@
  * Expected Res : SUCCESS
  */
 
+set `compiler.windowmemory` "512KB";
+
 use test;
 
 from q0_rnd() rnd
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
index d73a7f4..36f40c6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
@@ -21,6 +21,8 @@
  * Expected Res : SUCCESS
  */
 
+set `compiler.windowmemory` "224KB";
+
 use test;
 
 FROM tenk1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp
new file mode 100644
index 0000000..23be774
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test illegal value for 'compiler.windowmemory'
+ *              : Must be at least 160KB (5 frames)
+ * Expected Res : FAILURE
+ */
+
+set `compiler.windowmemory` "100KB";
+
+from range(1, 10) t
+select t, first_value(t) over(order by t)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
index 2d561b9..1e4d076 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
@@ -22,6 +22,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
 use test;
 
 q1_sum_1_preceding_1_following(10);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
index fec6158..4546867 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
@@ -22,6 +22,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
 use test;
 
 q1_sum_1_preceding_1_following(10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
index 84b5234..d772965 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
@@ -22,6 +22,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
 use test;
 
 with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
index 8a4374f..8c0c6b9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
@@ -23,6 +23,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
 use test;
 
 with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
index 91c0a31..5f5c508 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
@@ -23,6 +23,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
 use test;
 
 with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
index ad6913c..a2d3a53 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
@@ -22,6 +22,9 @@
  * Expected Res : SUCCESS
  */
 
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
 use test;
 
 q2_max_unbounded_preceding_n_following(5000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index da464c7..2bdb6c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -16,6 +16,7 @@
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
     "compiler\.textsearchmemory" : 163840,
+    "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
     "log\.dir" : "logs/",
     "log\.level" : "INFO",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index fa8f48e..599d6b4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -16,6 +16,7 @@
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
     "compiler\.textsearchmemory" : 163840,
+    "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 801900c..56d9dd9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -16,6 +16,7 @@
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
     "compiler\.textsearchmemory" : 163840,
+    "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d368b45..c905e0c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9278,13 +9278,6 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="window">
-      <compilation-unit name="ntile_02">
-        <output-dir compare="Text">ntile_01</output-dir>
-        <expected-error>ASX0002: Type mismatch</expected-error>
-        <source-location>false</source-location>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="window">
       <compilation-unit name="percent_rank_01">
         <output-dir compare="Text">percent_rank_01</output-dir>
       </compilation-unit>
@@ -9305,14 +9298,17 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="window">
-      <compilation-unit name="ratio_to_report_02_negative">
-        <output-dir compare="Text">ratio_to_report_01</output-dir>
-        <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error>
+      <compilation-unit name="row_number_01">
+        <output-dir compare="Text">row_number_01</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="window">
-      <compilation-unit name="row_number_01">
-        <output-dir compare="Text">row_number_01</output-dir>
+      <compilation-unit name="win_negative">
+        <output-dir compare="Text">misc_01</output-dir>
+        <expected-error>ASX0002: Type mismatch</expected-error>
+        <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error>
+        <expected-error>ASX1037: Invalid query parameter compiler.windowmemory</expected-error>
+        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="window">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 4bfbf11..7c67b96 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -43,6 +43,10 @@
                 LONG_BYTE_UNIT,
                 StorageUtil.getLongSizeInBytes(32L, MEGABYTE),
                 "The memory budget (in bytes) for a group by operator instance in a partition"),
+        COMPILER_WINDOWMEMORY(
+                LONG_BYTE_UNIT,
+                StorageUtil.getLongSizeInBytes(4L, MEGABYTE),
+                "The memory budget (in bytes) for a window operator instance in a partition"),
         COMPILER_TEXTSEARCHMEMORY(
                 LONG_BYTE_UNIT,
                 StorageUtil.getLongSizeInBytes(32L, MEGABYTE),
@@ -108,6 +112,8 @@
 
     public static final String COMPILER_JOINMEMORY_KEY = Option.COMPILER_JOINMEMORY.ini();
 
+    public static final String COMPILER_WINDOWMEMORY_KEY = Option.COMPILER_WINDOWMEMORY.ini();
+
     public static final String COMPILER_TEXTSEARCHMEMORY_KEY = Option.COMPILER_TEXTSEARCHMEMORY.ini();
 
     public static final String COMPILER_PARALLELISM_KEY = Option.COMPILER_PARALLELISM.ini();
@@ -134,6 +140,10 @@
         return accessor.getLong(Option.COMPILER_GROUPMEMORY);
     }
 
+    public long getWindowMemorySize() {
+        return accessor.getLong(Option.COMPILER_WINDOWMEMORY);
+    }
+
     public long getTextSearchMemorySize() {
         return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index 9269b5e..e8643cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -33,6 +33,8 @@
     private static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
     private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
     private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
+    // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable partition reader)
+    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
     // one for query, two for intermediate results, one for final result, and one for reading an inverted list
     private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5;
 
@@ -49,6 +51,9 @@
         int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
                 (String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
                 compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc);
+        int windowFrameLimit = getFrameLimit(CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+                (String) querySpecificConfig.get(CompilerProperties.COMPILER_WINDOWMEMORY_KEY),
+                compilerProperties.getWindowMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_WINDOW, sourceLoc);
         int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc);
         int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc);
         boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig);
@@ -58,6 +63,7 @@
         physOptConf.setMaxFramesExternalSort(sortFrameLimit);
         physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
         physOptConf.setMaxFramesForJoin(joinFrameLimit);
+        physOptConf.setMaxFramesForWindow(windowFrameLimit);
         physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit);
         physOptConf.setSortParallel(fullParallelSort);
         physOptConf.setSortSamples(sortNumSamples);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 1d8c47c..9a5f6d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -82,15 +82,19 @@
 
     private final boolean nestedTrivialAggregates;
 
+    // The maximum number of in-memory frames that this operator can use.
+    private final int memSizeInFrames;
+
     public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
             List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
-            boolean nestedTrivialAggregates) {
+            boolean nestedTrivialAggregates, int memSizeInFrames) {
         this.partitionColumns = partitionColumns;
         this.partitionMaterialization = partitionMaterialization;
         this.orderColumns = orderColumns;
         this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndIsMonotonic = frameEndIsMonotonic;
         this.nestedTrivialAggregates = nestedTrivialAggregates;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -227,7 +231,7 @@
                     runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
                             partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
                             projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
-                            aggregatorOutputSchemaSize, nestedAggFactory);
+                            aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
                 } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
                     // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
                     //                  trivial aggregate subplan ( aggregate + nts )
@@ -236,7 +240,8 @@
                             partitionComparatorFactories, orderComparatorFactories,
                             frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
                             frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans,
-                            runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory);
+                            runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
+                            memSizeInFrames);
                 }
             }
             // default case
@@ -248,12 +253,12 @@
                         winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
                         frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects,
                         projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
-                        aggregatorOutputSchemaSize, nestedAggFactory);
+                        aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
             }
         } else if (partitionMaterialization) {
             runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
                     orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
-                    runningAggFactories);
+                    runningAggFactories, memSizeInFrames);
         } else {
             runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
                     orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index d879d36..f9ea0c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -30,6 +30,7 @@
     private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
     private static final String MAX_FRAMES_FOR_JOIN_LEFT_INPUT = "MAX_FRAMES_FOR_JOIN_LEFT_INPUT";
     private static final String MAX_FRAMES_FOR_JOIN = "MAX_FRAMES_FOR_JOIN";
+    private static final String MAX_FRAMES_FOR_WINDOW = "MAX_FRAMES_FOR_WINDOW";
     private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH";
     private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
     private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
@@ -113,6 +114,15 @@
         setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
     }
 
+    public int getMaxFramesForWindow() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_FOR_WINDOW, (int) (((long) 4 * MB) / frameSize));
+    }
+
+    public void setMaxFramesForWindow(int frameLimit) {
+        setInt(MAX_FRAMES_FOR_WINDOW, frameLimit);
+    }
+
     public int getMaxFramesForTextSearch() {
         int frameSize = getFrameSize();
         return getInt(MAX_FRAMES_FOR_TEXTSEARCH, (int) (((long) 32 * MB) / frameSize));
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
index 0449cf5..b6cd0eb 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
@@ -86,10 +86,11 @@
         return new ArrayTupleBuilder(projectionList.length);
     }
 
-    protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException {
+    protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx, FrameTupleReference tupleRef)
+            throws HyracksDataException {
         for (int t = beginIdx; t <= endIdx; t++) {
-            tRef.reset(accessor, t);
-            produceTuple(tupleBuilder, accessor, t, tRef);
+            tupleRef.reset(accessor, t);
+            produceTuple(tupleBuilder, accessor, t, tupleRef);
             appendToFrameFromTupleBuilder(tupleBuilder);
         }
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
index 4ca166f..fd3e97d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
@@ -36,6 +36,6 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         tAccess.reset(buffer);
-        produceTuples(tAccess, 0, tAccess.getTupleCount() - 1);
+        produceTuples(tAccess, 0, tAccess.getTupleCount() - 1, tRef);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
index 9adeb4d..807f6bf 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -49,9 +50,10 @@
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
-            WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+            WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
+            SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, ctx);
+                runningAggOutColumns, runningAggFactories, ctx, memSizeInFrames, sourceLoc);
         this.nestedAggFactory = nestedAggFactory;
         this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
     }
@@ -75,23 +77,23 @@
 
     /**
      * Aggregator created by
-     * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, RecordDescriptor, int[], int[], long)
-     *        WindowAggregatorDescriptorFactory.createAggregator(...)}
+     * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor,
+     * RecordDescriptor, int[], int[], long) WindowAggregatorDescriptorFactory.createAggregator(...)}
      * does not process argument tuple in init()
      */
-    void nestedAggInit() throws HyracksDataException {
+    final void nestedAggInit() throws HyracksDataException {
         nestedAgg.init(null, null, -1, null);
     }
 
-    void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
+    final void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
         nestedAgg.aggregate(tAccess, tIndex, null, -1, null);
     }
 
-    void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+    final void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
         nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null);
     }
 
-    void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+    final void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
         nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null);
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
index 53857ac..0b8cc06 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
@@ -25,7 +25,7 @@
 /**
  * Base class for window runtime factories that compute nested aggregates
  */
-abstract class AbstractWindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory {
+abstract class AbstractWindowNestedPlansRuntimeFactory extends WindowMaterializingRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -37,9 +37,9 @@
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
-            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) {
+            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
-                projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+                projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, memSizeInFrames);
         this.nestedAggFactory = nestedAggFactory;
         this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
index a63aaf8..9cc25d0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
@@ -30,12 +30,14 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
 public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
 
+    protected final SourceLocation sourceLoc;
     private final int[] partitionColumns;
     private final IBinaryComparatorFactory[] partitionComparatorFactories;
     private IBinaryComparator[] partitionComparators;
@@ -48,11 +50,20 @@
 
     AbstractWindowPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
-            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
+            SourceLocation sourceLoc) {
         super(projectionColumns, runningAggOutColumns, runningAggFactories, IWindowAggregateEvaluator.class, ctx);
         this.partitionColumns = partitionColumns;
         this.partitionComparatorFactories = partitionComparatorFactories;
         this.orderComparatorFactories = orderComparatorFactories;
+        this.sourceLoc = sourceLoc;
+    }
+
+    /**
+     * Number of frames reserved by this operator: {@link #frame}, {@link #copyFrame}
+     */
+    int getReservedFrameCount() {
+        return 2;
     }
 
     @Override
@@ -78,7 +89,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (inPartition) {
+        if (inPartition && !failed) {
             endPartition();
         }
         super.close();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index 4e97d6c..889803a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -22,16 +22,11 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 
 /**
@@ -40,46 +35,42 @@
  */
 class WindowMaterializingPushRuntime extends AbstractWindowPushRuntime {
 
+    private final int memSizeInFrames;
+
     private long partitionLength;
 
-    IFrame curFrame;
+    private WindowPartitionWriter partitionWriter;
 
-    private long curFrameId;
+    WindowPartitionReader partitionReader;
 
     private int chunkBeginIdx;
 
     private IntArrayList chunkEndIdx;
 
-    private RunFileWriter run;
-
-    private long runLastFrameId;
-
     WindowMaterializingPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
-            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, int memSizeInFrames,
+            SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, ctx);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        super.open();
-        run = null;
-        curFrameId = -1;
+                runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
     protected void init() throws HyracksDataException {
         super.init();
-        curFrame = new VSizeFrame(ctx);
+        String runFilePrefix = getClass().getName();
+        partitionWriter = new WindowPartitionWriter(ctx, memSizeInFrames - getReservedFrameCount(), runFilePrefix,
+                getPartitionReaderSlotCount(), sourceLoc);
+        partitionReader = partitionWriter.getReader();
         chunkEndIdx = new IntArrayList(128, 128);
     }
 
     @Override
     public void close() throws HyracksDataException {
         super.close();
-        if (run != null) {
-            run.erase();
+        if (partitionWriter != null) {
+            partitionWriter.close();
         }
     }
 
@@ -87,41 +78,17 @@
     protected void beginPartitionImpl() throws HyracksDataException {
         chunkEndIdx.clear();
         partitionLength = 0;
-        if (run != null) {
-            run.rewind();
-        }
+        partitionWriter.reset();
     }
 
     @Override
     protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx)
             throws HyracksDataException {
-        // save the frame. first one to memory, remaining ones to the run file
         boolean isFirstChunk = chunkEndIdx.isEmpty();
+        partitionWriter.nextFrame(frameId, frameBuffer);
         if (isFirstChunk) {
-            if (frameId != curFrameId) {
-                int pos = frameBuffer.position();
-                curFrame.ensureFrameSize(frameBuffer.capacity());
-                FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
-                frameBuffer.position(pos);
-                curFrameId = frameId;
-            }
             chunkBeginIdx = tBeginIdx;
-        } else {
-            if (tBeginIdx != 0) {
-                throw new IllegalStateException(String.valueOf(tBeginIdx));
-            }
-            if (run == null) {
-                FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName());
-                run = new RunFileWriter(file, ctx.getIoManager());
-                run.open();
-            }
-            int pos = frameBuffer.position();
-            frameBuffer.position(0);
-            run.nextFrame(frameBuffer);
-            frameBuffer.position(pos);
-            runLastFrameId = frameId;
         }
-
         chunkEndIdx.add(tEndIdx);
         partitionLength += tEndIdx - tBeginIdx + 1;
     }
@@ -130,40 +97,32 @@
     protected void endPartitionImpl() throws HyracksDataException {
         runningAggInitPartition(partitionLength);
 
-        int nChunks = getPartitionChunkCount();
-        if (nChunks == 1) {
-            producePartitionTuples(0, null);
-        } else {
-            GeneratedRunFileReader reader = run.createReader();
-            reader.open();
-            try {
-                for (int chunkIdx = 0; chunkIdx < nChunks; chunkIdx++) {
-                    if (chunkIdx > 0) {
-                        reader.nextFrame(curFrame);
-                    }
-                    producePartitionTuples(chunkIdx, reader);
-                }
-                curFrameId = runLastFrameId;
-            } finally {
-                reader.close();
-            }
+        partitionReader.open();
+        for (int chunkIdx = 0, nChunks = getPartitionChunkCount(); chunkIdx < nChunks; chunkIdx++) {
+            IFrame chunkFrame = partitionReader.nextFrame(true);
+            producePartitionTuples(chunkIdx, chunkFrame);
         }
+        partitionReader.close();
     }
 
-    protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
-        tAccess.reset(curFrame.getBuffer());
-        produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx));
+    void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+        tAccess.reset(chunkFrame.getBuffer());
+        produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx), tRef);
     }
 
-    int getPartitionChunkCount() {
+    final int getPartitionChunkCount() {
         return chunkEndIdx.size();
     }
 
-    int getTupleBeginIdx(int chunkIdx) {
+    final int getTupleBeginIdx(int chunkIdx) {
         return chunkIdx == 0 ? chunkBeginIdx : 0;
     }
 
-    int getTupleEndIdx(int chunkIdx) {
+    final int getTupleEndIdx(int chunkIdx) {
         return chunkEndIdx.get(chunkIdx);
     }
+
+    int getPartitionReaderSlotCount() {
+        return -1; // forward only reader by default
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
index 1b02fb1..8bb3147 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
@@ -34,18 +34,22 @@
 
     private static final long serialVersionUID = 1L;
 
+    final int memSizeInFrames;
+
     public WindowMaterializingRuntimeFactory(int[] partitionColumns,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
-            int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) {
+            int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int memSizeInFrames) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
                 projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
         return new WindowMaterializingPushRuntime(partitionColumns, partitionComparatorFactories,
-                orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx);
+                orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx,
+                memSizeInFrames, sourceLoc);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index cb4f534..aa9d402 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -19,27 +19,23 @@
 
 package org.apache.hyracks.algebricks.runtime.operators.win;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.DataUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 /**
@@ -48,6 +44,14 @@
  */
 class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime {
 
+    private static final int PARTITION_POSITION_SLOT = 0;
+
+    private static final int FRAME_POSITION_SLOT = 1;
+
+    private static final int TMP_POSITION_SLOT = 2;
+
+    private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1;
+
     private final boolean frameValueExists;
 
     private final IScalarEvaluatorFactory[] frameValueEvalFactories;
@@ -106,14 +110,6 @@
 
     private final int frameMaxObjects;
 
-    private IFrame copyFrame2;
-
-    private IFrame runFrame;
-
-    private int runFrameChunkId;
-
-    private long runFrameSize;
-
     private FrameTupleAccessor tAccess2;
 
     private FrameTupleReference tRef2;
@@ -124,8 +120,6 @@
 
     private int tBeginIdxFrameStartGlobal;
 
-    private long readerPosFrameStartGlobal;
-
     WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
             IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
@@ -134,9 +128,11 @@
             IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
             IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
-            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx,
+            int memSizeInFrames, SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+                memSizeInFrames, sourceLoc);
         this.frameValueEvalFactories = frameValueEvalFactories;
         this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
         this.frameStartEvalFactories = frameStartEvalFactories;
@@ -158,7 +154,6 @@
     @Override
     protected void init() throws HyracksDataException {
         super.init();
-
         if (frameValueExists) {
             frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
             frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
@@ -183,9 +178,6 @@
             frameOffsetPointable = VoidPointable.FACTORY.createPointable();
             bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
         }
-
-        runFrame = new VSizeFrame(ctx);
-        copyFrame2 = new VSizeFrame(ctx);
         tAccess2 = new FrameTupleAccessor(inputRecordDesc);
         tRef2 = new FrameTupleReference();
     }
@@ -195,31 +187,22 @@
         super.beginPartitionImpl();
         chunkIdxFrameStartGlobal = -1;
         tBeginIdxFrameStartGlobal = -1;
-        readerPosFrameStartGlobal = -1;
-        runFrameChunkId = -1;
     }
 
     @Override
-    protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
-        boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
+    protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+        partitionReader.savePosition(PARTITION_POSITION_SLOT);
 
-        long readerPos = -1;
         int nChunks = getPartitionChunkCount();
-        if (nChunks > 1) {
-            readerPos = reader.position();
-            if (chunkIdx == 0) {
-                ByteBuffer curFrameBuffer = curFrame.getBuffer();
-                int pos = curFrameBuffer.position();
-                copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
-                FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
-                curFrameBuffer.position(pos);
-            }
-        }
+        boolean isFirstChunkInPartition = chunkIdx == 0;
 
-        tAccess.reset(curFrame.getBuffer());
+        tAccess.reset(chunkFrame.getBuffer());
         int tBeginIdx = getTupleBeginIdx(chunkIdx);
         int tEndIdx = getTupleEndIdx(chunkIdx);
+
         for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+            boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;
+
             tRef.reset(tAccess, tIdx);
 
             // running aggregates
@@ -245,34 +228,23 @@
 
             nestedAggInit();
 
+            boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
             int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
             int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
-            if (nChunks > 1) {
-                reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0);
+
+            if (chunkIdxInnerStart < nChunks) {
+                if (frameStartForward && !isFirstTupleInPartition) {
+                    partitionReader.restorePosition(FRAME_POSITION_SLOT);
+                } else {
+                    partitionReader.rewind();
+                }
             }
 
             int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
-            long readerPosFrameStartLocal = -1;
 
             frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
-                long readerPosFrameInner;
-                IFrame frameInner;
-                if (chunkIdxInner == 0) {
-                    // first chunk's frame is always in memory
-                    frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
-                    readerPosFrameInner = 0;
-                } else {
-                    readerPosFrameInner = reader.position();
-                    if (runFrameChunkId == chunkIdxInner) {
-                        // runFrame has this chunk, so just advance the reader
-                        reader.seek(readerPosFrameInner + runFrameSize);
-                    } else {
-                        reader.nextFrame(runFrame);
-                        runFrameSize = reader.position() - readerPosFrameInner;
-                        runFrameChunkId = chunkIdxInner;
-                    }
-                    frameInner = runFrame;
-                }
+                partitionReader.savePosition(TMP_POSITION_SLOT);
+                IFrame frameInner = partitionReader.nextFrame(false);
                 tAccess2.reset(frameInner.getBuffer());
 
                 int tBeginIdxInner;
@@ -294,17 +266,19 @@
                                 // skip if value < start
                                 continue;
                             }
+                            // inside the frame
                             if (chunkIdxFrameStartLocal < 0) {
-                                // save position of the first tuple that matches the frame start.
-                                // we'll continue from it in the next frame iteration
+                                // save position of the first tuple in this frame
+                                // will continue from it in the next frame iteration
                                 chunkIdxFrameStartLocal = chunkIdxInner;
                                 tBeginIdxFrameStartLocal = tIdxInner;
-                                readerPosFrameStartLocal = readerPosFrameInner;
+                                partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
                             }
                         }
                         if (frameEndExists
                                 && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
-                            // skip and exit if value > end
+                            // value > end => beyond the frame end
+                            // exit the frame loop
                             break frame_loop;
                         }
                     }
@@ -331,27 +305,22 @@
                 }
             }
 
-            nestedAggOutputFinalResult(tupleBuilder);
-            appendToFrameFromTupleBuilder(tupleBuilder);
-
             if (frameStartIsMonotonic) {
-                frameStartForward = true;
                 if (chunkIdxFrameStartLocal >= 0) {
                     chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
                     tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
-                    readerPosFrameStartGlobal = readerPosFrameStartLocal;
                 } else {
-                    // frame start not found, set start beyond the last chunk
+                    // frame start not found, set it beyond the last chunk
                     chunkIdxFrameStartGlobal = nChunks;
                     tBeginIdxFrameStartGlobal = 0;
-                    readerPosFrameStartGlobal = 0;
                 }
             }
+
+            nestedAggOutputFinalResult(tupleBuilder);
+            appendToFrameFromTupleBuilder(tupleBuilder);
         }
 
-        if (nChunks > 1) {
-            reader.seek(readerPos);
-        }
+        partitionReader.restorePosition(PARTITION_POSITION_SLOT);
     }
 
     private boolean isExcluded() throws HyracksDataException {
@@ -368,4 +337,9 @@
         }
         return true;
     }
+
+    @Override
+    protected int getPartitionReaderSlotCount() {
+        return PARTITION_READER_SLOT_COUNT;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
index e550d65..fe7e93f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -19,21 +19,17 @@
 
 package org.apache.hyracks.algebricks.runtime.operators.win;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 /**
@@ -41,7 +37,15 @@
  * as well as regular aggregates (in nested plans) over accumulating window frames
  * (unbounded preceding to current row or N following).
  */
-class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+final class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+
+    private static final int PARTITION_POSITION_SLOT = 0;
+
+    private static final int FRAME_POSITION_SLOT = 1;
+
+    private static final int TMP_POSITION_SLOT = 2;
+
+    private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1;
 
     private final IScalarEvaluatorFactory[] frameValueEvalFactories;
 
@@ -61,14 +65,6 @@
 
     private final int frameMaxObjects;
 
-    private IFrame copyFrame2;
-
-    private IFrame runFrame;
-
-    private int runFrameChunkId;
-
-    private long runFrameSize;
-
     private FrameTupleAccessor tAccess2;
 
     private FrameTupleReference tRef2;
@@ -77,8 +73,6 @@
 
     private int tBeginIdxFrameEndGlobal;
 
-    private long readerPosFrameEndGlobal;
-
     private int toWrite;
 
     WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
@@ -86,9 +80,11 @@
             IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
             int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
-            WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+            WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
+            SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+                memSizeInFrames, sourceLoc);
         this.frameValueEvalFactories = frameValueEvalFactories;
         this.frameEndEvalFactories = frameEndEvalFactories;
         this.frameValueComparatorFactories = frameValueComparatorFactories;
@@ -98,15 +94,11 @@
     @Override
     protected void init() throws HyracksDataException {
         super.init();
-
         frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
         frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
         frameValuePointables = createPointables(frameValueEvalFactories.length);
         frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
         frameEndPointables = createPointables(frameEndEvalFactories.length);
-
-        runFrame = new VSizeFrame(ctx);
-        copyFrame2 = new VSizeFrame(ctx);
         tAccess2 = new FrameTupleAccessor(inputRecordDesc);
         tRef2 = new FrameTupleReference();
     }
@@ -117,32 +109,25 @@
         nestedAggInit();
         chunkIdxFrameEndGlobal = 0;
         tBeginIdxFrameEndGlobal = -1;
-        readerPosFrameEndGlobal = 0;
-        runFrameChunkId = -1;
         toWrite = frameMaxObjects;
     }
 
     @Override
-    protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
-        long readerPos = -1;
+    protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+        partitionReader.savePosition(PARTITION_POSITION_SLOT);
+
         int nChunks = getPartitionChunkCount();
-        if (nChunks > 1) {
-            readerPos = reader.position();
-            if (chunkIdx == 0) {
-                ByteBuffer curFrameBuffer = curFrame.getBuffer();
-                int pos = curFrameBuffer.position();
-                copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
-                FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
-                curFrameBuffer.position(pos);
-            }
-        }
+        boolean isFirstChunkInPartition = chunkIdx == 0;
+        boolean isLastChunkInPartition = chunkIdx == nChunks - 1;
 
-        boolean isLastChunk = chunkIdx == nChunks - 1;
-
-        tAccess.reset(curFrame.getBuffer());
+        tAccess.reset(chunkFrame.getBuffer());
         int tBeginIdx = getTupleBeginIdx(chunkIdx);
         int tEndIdx = getTupleEndIdx(chunkIdx);
+
         for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+            boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;
+            boolean isLastTupleInPartition = isLastChunkInPartition && tIdx == tEndIdx;
+
             tRef.reset(tAccess, tIdx);
 
             // running aggregates
@@ -153,40 +138,28 @@
 
             int chunkIdxInnerStart = chunkIdxFrameEndGlobal;
             int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal;
-            if (nChunks > 1) {
-                reader.seek(readerPosFrameEndGlobal);
+
+            if (chunkIdxInnerStart < nChunks) {
+                if (!isFirstTupleInPartition) {
+                    partitionReader.restorePosition(FRAME_POSITION_SLOT);
+                } else {
+                    partitionReader.rewind();
+                }
             }
 
             int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1;
-            long readerPosFrameEndLocal = -1;
 
             frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
-                long readerPosFrameInner;
-                IFrame frameInner;
-                if (chunkIdxInner == 0) {
-                    // first chunk's frame is always in memory
-                    frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
-                    readerPosFrameInner = 0;
-                } else {
-                    readerPosFrameInner = reader.position();
-                    if (runFrameChunkId == chunkIdxInner) {
-                        // runFrame has this chunk, so just advance the reader
-                        reader.seek(readerPosFrameInner + runFrameSize);
-                    } else {
-                        reader.nextFrame(runFrame);
-                        runFrameSize = reader.position() - readerPosFrameInner;
-                        runFrameChunkId = chunkIdxInner;
-                    }
-                    frameInner = runFrame;
-                }
+                partitionReader.savePosition(TMP_POSITION_SLOT);
+                IFrame frameInner = partitionReader.nextFrame(false);
                 tAccess2.reset(frameInner.getBuffer());
 
                 int tBeginIdxInner;
-                if (tBeginIdxInnerStart < 0) {
-                    tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
-                } else {
+                if (tBeginIdxInnerStart >= 0) {
                     tBeginIdxInner = tBeginIdxInnerStart;
                     tBeginIdxInnerStart = -1;
+                } else {
+                    tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
                 }
                 int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
 
@@ -194,14 +167,14 @@
                     tRef2.reset(tAccess2, tIdxInner);
 
                     evaluate(frameValueEvals, tRef2, frameValuePointables);
+
                     if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
-                        // save position of the tuple that matches the frame end.
-                        // we'll continue from it in the next outer iteration
+                        // value > end => beyond the frame end
+                        // save position of the current tuple, will continue from it in the next outer iteration
                         chunkIdxFrameEndLocal = chunkIdxInner;
                         tBeginIdxFrameEndLocal = tIdxInner;
-                        readerPosFrameEndLocal = readerPosFrameInner;
-
-                        // skip and exit if value > end
+                        partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
+                        // exit the frame loop
                         break frame_loop;
                     }
 
@@ -213,28 +186,28 @@
                 }
             }
 
-            boolean isLastTuple = isLastChunk && tIdx == tEndIdx;
-            if (isLastTuple) {
+            if (chunkIdxFrameEndLocal >= 0) {
+                chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
+                tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
+            } else {
+                // frame end not found, set it beyond the last chunk
+                chunkIdxFrameEndGlobal = nChunks;
+                tBeginIdxFrameEndGlobal = 0;
+            }
+
+            if (isLastTupleInPartition) {
                 nestedAggOutputFinalResult(tupleBuilder);
             } else {
                 nestedAggOutputPartialResult(tupleBuilder);
             }
             appendToFrameFromTupleBuilder(tupleBuilder);
-
-            if (chunkIdxFrameEndLocal >= 0) {
-                chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
-                tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
-                readerPosFrameEndGlobal = readerPosFrameEndLocal;
-            } else {
-                // could not find the end, set beyond the last chunk
-                chunkIdxFrameEndGlobal = nChunks;
-                tBeginIdxFrameEndGlobal = 0;
-                readerPosFrameEndGlobal = 0;
-            }
         }
 
-        if (nChunks > 1) {
-            reader.seek(readerPos);
-        }
+        partitionReader.restorePosition(PARTITION_POSITION_SLOT);
+    }
+
+    @Override
+    protected int getPartitionReaderSlotCount() {
+        return PARTITION_READER_SLOT_COUNT;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
index ddeaf2b..53692d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
@@ -50,10 +50,10 @@
             IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
             int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
-            WindowAggregatorDescriptorFactory nestedAggFactory) {
+            WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
                 projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
-                nestedAggFactory);
+                nestedAggFactory, memSizeInFrames);
         this.frameValueEvalFactories = frameValueEvalFactories;
         this.frameValueComparatorFactories = frameValueComparatorFactories;
         this.frameEndEvalFactories = frameEndEvalFactories;
@@ -65,7 +65,7 @@
         return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories,
                 orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories,
                 frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
-                nestedAggFactory, ctx);
+                nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index f754b91..4a7c837 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -68,10 +68,10 @@
             IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects,
             int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
-            WindowAggregatorDescriptorFactory nestedAggFactory) {
+            WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
                 projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
-                nestedAggFactory);
+                nestedAggFactory, memSizeInFrames);
         this.frameValueEvalFactories = frameValueEvalFactories;
         this.frameValueComparatorFactories = frameValueComparatorFactories;
         this.frameStartEvalFactories = frameStartEvalFactories;
@@ -92,7 +92,7 @@
                 frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories,
                 frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory,
                 binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns,
-                runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+                runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
index b25a36c..d97c855 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.utils.TupleUtils;
@@ -55,9 +56,11 @@
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects, int[] projectionColumns,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
-            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx,
+            int memSizeInFrames, SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+                runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+                memSizeInFrames, sourceLoc);
         this.frameMaxObjects = frameMaxObjects;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
index 0f7d9cf..b2dfbca 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
@@ -41,10 +41,10 @@
             IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects,
             int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
-            WindowAggregatorDescriptorFactory nestedAggFactory) {
+            WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
                 projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
-                nestedAggFactory);
+                nestedAggFactory, memSizeInFrames);
         this.frameMaxObjects = frameMaxObjects;
     }
 
@@ -52,7 +52,7 @@
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
         return new WindowNestedPlansUnboundedPushRuntime(partitionColumns, partitionComparatorFactories,
                 orderComparatorFactories, frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories,
-                nestedAggOutSchemaSize, nestedAggFactory, ctx);
+                nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java
new file mode 100644
index 0000000..208effc
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+interface WindowPartitionReader {
+
+    void open() throws HyracksDataException;
+
+    IFrame nextFrame(boolean primaryScan) throws HyracksDataException;
+
+    void close() throws HyracksDataException;
+
+    // position manipulation
+
+    void rewind();
+
+    void savePosition(int slotNo);
+
+    void restorePosition(int slotNo);
+
+    void copyPosition(int slotFrom, int slotTo);
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java
new file mode 100644
index 0000000..b3eb317
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+
+final class WindowPartitionWriter {
+
+    private final IHyracksTaskContext ctx;
+
+    private final String fileNamePrefix;
+
+    private final SourceLocation sourceLoc;
+
+    private final IFrame[] writerFrames;
+
+    private int writerFrameCount;
+
+    private long writerFirstFrameId;
+
+    private long writerLastFrameId;
+
+    private RunFileWriter fileWriter;
+
+    private final AbstractWindowPartitionReader partitionReader;
+
+    WindowPartitionWriter(IHyracksTaskContext ctx, int memSizeInFrames, String fileNamePrefix,
+            int readerPositionStoreSize, SourceLocation sourceLoc) throws HyracksDataException {
+        this.ctx = ctx;
+        this.fileNamePrefix = fileNamePrefix;
+        this.sourceLoc = sourceLoc;
+        partitionReader = readerPositionStoreSize < 1 ? new WindowPartitionForwardReader()
+                : new WindowPartitionSeekableReader(readerPositionStoreSize);
+        int writerFrameBudget = memSizeInFrames - partitionReader.getReservedFrameCount();
+        if (writerFrameBudget < 1) {
+            throw new IllegalArgumentException(String.valueOf(memSizeInFrames));
+        }
+        writerFrames = new IFrame[writerFrameBudget];
+        // Allocate one writer frame here. Remaining frames will be allocated lazily while writing
+        allocateFrames(writerFrames, 1);
+        writerFirstFrameId = writerLastFrameId = -1;
+    }
+
+    void close() throws HyracksDataException {
+        try {
+            partitionReader.closeFileReader();
+        } finally {
+            if (fileWriter != null) {
+                fileWriter.close();
+            }
+        }
+    }
+
+    void reset() {
+        writerFrameCount = 0;
+        if (fileWriter != null) {
+            fileWriter.rewind();
+        }
+    }
+
+    void nextFrame(long frameId, ByteBuffer frameBuffer) throws HyracksDataException {
+        if (frameId < 0) {
+            throw new IllegalArgumentException(String.valueOf(frameId));
+        }
+        if (writerFrameCount == 0) {
+            if (writerFirstFrameId != frameId) {
+                copyToFrame(frameBuffer, writerFrames[0]);
+                writerFirstFrameId = frameId;
+            }
+        } else if (writerFrameCount < writerFrames.length) {
+            IFrame writerFrame = writerFrames[writerFrameCount];
+            if (writerFrame == null) {
+                writerFrames[writerFrameCount] = writerFrame = new VSizeFrame(ctx);
+            }
+            copyToFrame(frameBuffer, writerFrame);
+        } else {
+            if (fileWriter == null) {
+                FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(fileNamePrefix);
+                fileWriter = new RunFileWriter(file, ctx.getIoManager());
+                fileWriter.open();
+            }
+            int pos = frameBuffer.position();
+            frameBuffer.position(0);
+            fileWriter.nextFrame(frameBuffer);
+            frameBuffer.position(pos);
+        }
+        writerLastFrameId = frameId;
+        writerFrameCount++;
+    }
+
+    WindowPartitionReader getReader() {
+        return partitionReader;
+    }
+
+    private void allocateFrames(IFrame[] outFrames, int count) throws HyracksDataException {
+        for (int i = 0; i < count; i++) {
+            outFrames[i] = new VSizeFrame(ctx);
+        }
+    }
+
+    private static void copyToFrame(ByteBuffer fromBuffer, IFrame toFrame) throws HyracksDataException {
+        toFrame.ensureFrameSize(fromBuffer.capacity());
+        int fromPosition = fromBuffer.position();
+        FrameUtils.copyAndFlip(fromBuffer, toFrame.getBuffer());
+        fromBuffer.position(fromPosition);
+    }
+
+    private static <T> void swap(T[] array1, int index1, T[] array2, int index2) {
+        T item1 = array1[index1];
+        array1[index1] = array2[index2];
+        array2[index2] = item1;
+    }
+
+    private abstract class AbstractWindowPartitionReader implements WindowPartitionReader {
+
+        int readerFrameIdx = -1;
+
+        GeneratedRunFileReader fileReader;
+
+        @Override
+        public void open() throws HyracksDataException {
+            if (readerFrameIdx >= 0) {
+                throw new IllegalStateException(String.valueOf(readerFrameIdx));
+            }
+            readerFrameIdx = 0;
+
+            if (writerFrameCount > writerFrames.length) {
+                openFileReader();
+            }
+        }
+
+        @Override
+        public final void close() throws HyracksDataException {
+            if (readerFrameIdx != writerFrameCount) {
+                throw new IllegalStateException();
+            }
+
+            // closeImpl() must guarantee that first writer frame will contain content of the last partition frame
+            closeImpl();
+            writerFirstFrameId = writerLastFrameId;
+
+            readerFrameIdx = -1;
+
+            if (writerFrameCount > writerFrames.length) {
+                closeFileReader();
+            }
+        }
+
+        void openFileReader() throws HyracksDataException {
+            if (fileReader != null) {
+                throw new IllegalStateException();
+            }
+            fileReader = fileWriter.createReader();
+            fileReader.open();
+        }
+
+        void closeFileReader() throws HyracksDataException {
+            GeneratedRunFileReader r = fileReader;
+            if (r != null) {
+                fileReader = null;
+                r.close();
+            }
+        }
+
+        void readFromFileReader(IFrame outFrame) throws HyracksDataException {
+            if (!fileReader.nextFrame(outFrame)) {
+                throw HyracksDataException.create(ErrorCode.EOF, sourceLoc);
+            }
+        }
+
+        @Override
+        public final IFrame nextFrame(boolean primaryScan) throws HyracksDataException {
+            if (readerFrameIdx < 0) {
+                throw new IllegalStateException();
+            }
+            if (readerFrameIdx >= writerFrameCount) {
+                throw HyracksDataException.create(ErrorCode.EOF, sourceLoc);
+            }
+            IFrame frame = nextFrameImpl(primaryScan);
+            readerFrameIdx++;
+            return frame;
+        }
+
+        abstract void closeImpl() throws HyracksDataException;
+
+        abstract IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException;
+
+        abstract int getReservedFrameCount();
+    }
+
+    private final class WindowPartitionForwardReader extends AbstractWindowPartitionReader {
+
+        @Override
+        IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException {
+            if (!primaryScan) {
+                throw new IllegalArgumentException();
+            }
+            if (readerFrameIdx < writerFrames.length) {
+                return writerFrames[readerFrameIdx];
+            } else {
+                IFrame writerFrame0 = writerFrames[0];
+                readFromFileReader(writerFrame0);
+                return writerFrame0;
+            }
+        }
+
+        @Override
+        void closeImpl() {
+            int endFrameIdx = readerFrameIdx - 1;
+            if (endFrameIdx > 0 && endFrameIdx < writerFrames.length) {
+                // last partition frame is in writerFrames -> make it the first one
+                swap(writerFrames, 0, writerFrames, endFrameIdx);
+            }
+        }
+
+        @Override
+        public void savePosition(int slotNo) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void copyPosition(int slotFrom, int slotTo) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void restorePosition(int slotNo) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void rewind() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        int getReservedFrameCount() {
+            return 0;
+        }
+    }
+
+    private final class WindowPartitionSeekableReader extends AbstractWindowPartitionReader {
+
+        private final IFrame[] fileFrames;
+
+        private final long[] fileFrameIdxs;
+
+        private final long[] fileFrameSizes;
+
+        private final long[] filePositionStore;
+
+        private final int[] readerFrameIdxStore;
+
+        private WindowPartitionSeekableReader(int positionStoreSize) throws HyracksDataException {
+            fileFrames = new IFrame[2]; // run file frames: one for primary scan, another for non-primary
+            allocateFrames(fileFrames, fileFrames.length);
+            fileFrameIdxs = new long[fileFrames.length];
+            fileFrameSizes = new long[fileFrames.length];
+            filePositionStore = new long[positionStoreSize];
+            readerFrameIdxStore = new int[positionStoreSize];
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            super.open();
+            Arrays.fill(fileFrameIdxs, -1);
+            Arrays.fill(filePositionStore, -1);
+            Arrays.fill(readerFrameIdxStore, -1);
+        }
+
+        @Override
+        IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException {
+            if (readerFrameIdx < writerFrames.length) {
+                return writerFrames[readerFrameIdx];
+            } else {
+                int fileFrameSlot = primaryScan ? 0 : 1;
+                IFrame fileFrameRef = fileFrames[fileFrameSlot];
+                long filePosition = fileReader.position();
+                if (readerFrameIdx == fileFrameIdxs[fileFrameSlot]) {
+                    fileReader.seek(filePosition + fileFrameSizes[fileFrameSlot]);
+                } else {
+                    readFromFileReader(fileFrameRef);
+                    fileFrameSizes[fileFrameSlot] = fileReader.position() - filePosition;
+                    fileFrameIdxs[fileFrameSlot] = readerFrameIdx;
+                }
+                return fileFrameRef;
+            }
+        }
+
+        @Override
+        public void closeImpl() {
+            int endFrameIdx = readerFrameIdx - 1;
+            if (endFrameIdx >= writerFrames.length) {
+                // last partition frame was in the run file -> get contents from the file frame
+                swap(writerFrames, 0, fileFrames, 0);
+            } else if (endFrameIdx > 0) {
+                // last partition frame is in writerFrames -> make it the first one
+                swap(writerFrames, 0, writerFrames, endFrameIdx);
+            }
+        }
+
+        @Override
+        public void savePosition(int slotNo) {
+            readerFrameIdxStore[slotNo] = readerFrameIdx;
+            filePositionStore[slotNo] = fileReader != null ? fileReader.position() : 0;
+        }
+
+        @Override
+        public void copyPosition(int slotFrom, int slotTo) {
+            readerFrameIdxStore[slotTo] = readerFrameIdxStore[slotFrom];
+            filePositionStore[slotTo] = filePositionStore[slotFrom];
+        }
+
+        @Override
+        public void restorePosition(int slotNo) {
+            seek(readerFrameIdxStore[slotNo], filePositionStore[slotNo]);
+        }
+
+        @Override
+        public void rewind() {
+            seek(0, 0);
+        }
+
+        private void seek(int readerFrameIdx, long filePosition) {
+            this.readerFrameIdx = readerFrameIdx;
+            if (fileReader != null) {
+                fileReader.seek(filePosition);
+            }
+        }
+
+        @Override
+        int getReservedFrameCount() {
+            return fileFrames.length;
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
index bf71ed9..f7f1a25 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 /**
  * Runtime for window operators that evaluates running aggregates without partition materialization.
@@ -33,9 +34,10 @@
 
     WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
-            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
+            SourceLocation sourceLoc) {
         super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
-                runningAggOutColumns, runningAggFactories, ctx);
+                runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
     }
 
     @Override
@@ -47,7 +49,7 @@
     protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx)
             throws HyracksDataException {
         tAccess.reset(frameBuffer);
-        produceTuples(tAccess, tBeginIdx, tEndIdx);
+        produceTuples(tAccess, tBeginIdx, tEndIdx, tRef);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
index ded399f..2d1cdde 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
@@ -43,7 +43,7 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
         return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
-                projectionList, runningAggOutColumns, runningAggFactories, ctx);
+                projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index d6860f7..4ff0df3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -153,6 +153,7 @@
     public static final int NO_RANGEMAP_PRODUCED = 117;
     public static final int RANGEMAP_NOT_FOUND = 118;
     public static final int UNSUPPORTED_WINDOW_SPEC = 119;
+    public static final int EOF = 120;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index e95bf76..5c9863c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -136,6 +136,7 @@
 117 = No range map produced for parallel sort
 118 = Range map was not found for parallel sort
 119 = Unsupported window specification: PARTITION BY %1$s, ORDER BY %2$s
+120 = End of file
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index 03fdb49..50cacb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -73,7 +73,7 @@
 
         int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
         if (readLength <= 0) {
-            throw new HyracksDataException("Premature end of file");
+            throw HyracksDataException.create(ErrorCode.EOF);
         }
         readPtr += readLength;
         frame.ensureFrameSize(frame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frame.getBuffer()));
@@ -81,7 +81,7 @@
             if (readPtr < size) {
                 readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
                 if (readLength < 0) {
-                    throw new HyracksDataException("Premature end of file");
+                    throw HyracksDataException.create(ErrorCode.EOF);
                 }
                 readPtr += readLength;
             }