[NO ISSUE][RT] Window operator runtime optimization

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Runtime optimization for window operator with monotonic
  frame start expression. In this case continue scanning
  from the beginning of the frame that was found in the
  previous iteration
- Allow inlining variables into window operator expressions
  except PARTITION BY, ORDER BY and frame value expressions

Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3135
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 6749bf9..d1ce865 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 
 public class AnalysisUtil {
@@ -129,6 +130,39 @@
         return new Pair<>(dataverseName, datasetName);
     }
 
+    /**
+     * Checks whether frame boundary expression is a monotonically non-descreasing function over a frame value variable
+     */
+    public static boolean isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>> frameBoundaryExprList,
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList) {
+        if (frameValueExprList.size() != 1) {
+            return false;
+        }
+        ILogicalExpression frameValueExpr = frameValueExprList.get(0).second.getValue();
+        if (frameValueExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+            return false;
+        }
+        if (frameBoundaryExprList.size() != 1) {
+            return false;
+        }
+        ILogicalExpression frameStartExpr = frameBoundaryExprList.get(0).getValue();
+        switch (frameStartExpr.getExpressionTag()) {
+            case CONSTANT:
+                return true;
+            case VARIABLE:
+                return frameStartExpr.equals(frameValueExpr);
+            case FUNCTION_CALL:
+                AbstractFunctionCallExpression frameStartCallExpr = (AbstractFunctionCallExpression) frameStartExpr;
+                FunctionIdentifier fi = frameStartCallExpr.getFunctionIdentifier();
+                return (BuiltinFunctions.NUMERIC_ADD.equals(fi) || BuiltinFunctions.NUMERIC_SUBTRACT.equals(fi))
+                        && frameStartCallExpr.getArguments().get(0).getValue().equals(frameValueExpr)
+                        && frameStartCallExpr.getArguments().get(1).getValue()
+                                .getExpressionTag() == LogicalExpressionTag.CONSTANT;
+            default:
+                throw new IllegalStateException(String.valueOf(frameStartExpr.getExpressionTag()));
+        }
+    }
+
     private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
 
     static {
@@ -136,5 +170,4 @@
         fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE);
         fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF);
     }
-
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index cd75c1e..ce9fd03 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.optimizer.base.AnalysisUtil;
 import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
 import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -380,7 +381,8 @@
                 }
             }
         }
-
-        return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns);
+        boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
+                winOp.getFrameValueExpressions());
+        return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
new file mode 100644
index 0000000..47f2ce9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Tests runtime optimizations of window functions
+ * Expected Res : SUCCESS
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create function q1_sum_1_preceding_1_following(N) {
+  from
+    range(1, N) x
+  let
+    result_expected = 3 * x - (case x when N then x + 1 else 0 end),
+    result_actual = sum(x) over (order by x range between 1 preceding and 1 following),
+    result_delta = result_expected - result_actual
+  select
+    min(result_delta) min_delta,
+    max(result_delta) max_delta
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
new file mode 100644
index 0000000..2d561b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test window operator with monotonic frame start expression
+ *              : on a dataset that fits into one physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10);
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
new file mode 100644
index 0000000..fec6158
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test window operator with monotonic frame start expression
+ *              : on a dataset that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
new file mode 100644
index 0000000..84b5234
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test window operator with monotonic frame start expression
+ *              : on dataset that spans several physical frames with frame that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+  from
+    range(1, N) x
+  select value
+    sum(x) over (order by x range between W preceding and W following)
+) v
+select value sum(v)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
new file mode 100644
index 0000000..8a4374f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test window operator with monotonic frame start expression
+ *              : on a dataset that spans several physical frames
+ *              : with a frame that starts before current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+  from
+    range(1, N) x
+  select value
+    sum(x) over (order by x range between W + 2 preceding and W preceding)
+) v
+select value sum(v)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
new file mode 100644
index 0000000..91c0a31
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Test window operator with monotonic frame start expression
+ *              : on a dataset that spans several physical frames
+ *              : with a frame that starts after current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+  from
+    range(1, N) x
+  select value
+    sum(x) over (order by x range between W following and W + 2 following)
+) v
+select value sum(v)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
new file mode 100644
index 0000000..76a77d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
@@ -0,0 +1 @@
+375062502500
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
new file mode 100644
index 0000000..77dda1e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
@@ -0,0 +1 @@
+37492501
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
new file mode 100644
index 0000000..a25255a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
@@ -0,0 +1 @@
+112492496
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c8ac6a..455b3ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9311,6 +9311,11 @@
         <output-dir compare="Text">win_opt_01</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="window">
+      <compilation-unit name="win_opt_02">
+        <output-dir compare="Text">win_opt_02</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="writers">
     <test-case FilePath="writers">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 8f563b3..bdfdac8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -40,9 +40,12 @@
 /**
  * Window operator evaluates window functions. It has the following components:
  * <ul>
- * <li>{@link #partitionExpressions} - define how input data must be partitioned</li>
- * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li>
- * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion</li>
+ * <li>{@link #partitionExpressions} - define how input data must be partitioned.
+ *     Each must be a variable reference</li>
+ * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered.
+ *     Each must be a variable reference</li>
+ * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion.
+ *     Each must be a variable reference</li>
  * <li>{@link #frameStartExpressions} - frame start boundary</li>
  * <li>{@link #frameEndExpressions} - frame end boundary</li>
  * <li>{@link #frameExcludeExpressions} - define values to be excluded from the frame</li>
@@ -217,15 +220,27 @@
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        return acceptExpressionTransform(visitor, true);
+    }
+
+    /**
+     * Allows performing expression transformation only on a subset of this operator's expressions
+     * @param visitor transforming visitor
+     * @param visitVarRefRequiringExprs whether to visit variable reference requiring expressions, or not
+     */
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor,
+            boolean visitVarRefRequiringExprs) throws AlgebricksException {
         boolean mod = false;
-        for (Mutable<ILogicalExpression> expr : partitionExpressions) {
-            mod |= visitor.transform(expr);
-        }
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
-            mod |= visitor.transform(p.second);
-        }
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) {
-            mod |= visitor.transform(p.second);
+        if (visitVarRefRequiringExprs) {
+            for (Mutable<ILogicalExpression> expr : partitionExpressions) {
+                mod |= visitor.transform(expr);
+            }
+            for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+                mod |= visitor.transform(p.second);
+            }
+            for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) {
+                mod |= visitor.transform(p.second);
+            }
         }
         for (Mutable<ILogicalExpression> expr : frameStartExpressions) {
             mod |= visitor.transform(expr);
@@ -305,4 +320,14 @@
             expr.getValue().getUsedVariables(vars);
         }
     }
+
+    /**
+     * Only the following expressions require variable references: {@link #partitionExpressions},
+     * {@link #orderExpressions}, and {@link #frameValueExpressions}, others do not.
+     * Use {@link #acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)}
+     * to visit only non-requiring expressions.
+     */
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index c8168d1..2a8658d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -75,11 +75,14 @@
 
     private final List<OrderColumn> orderColumns;
 
+    private final boolean frameStartIsMonotonic;
+
     public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
-            List<OrderColumn> orderColumns) {
+            List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) {
         this.partitionColumns = partitionColumns;
         this.partitionMaterialization = partitionMaterialization;
         this.orderColumns = orderColumns;
+        this.frameStartIsMonotonic = frameStartIsMonotonic;
     }
 
     @Override
@@ -217,10 +220,10 @@
             } else {
                 runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
                         orderComparatorFactories, frameValueExprEvalsAndComparators.first,
-                        frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals,
-                        frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
-                        frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval,
-                        context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
+                        frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic,
+                        frameEndExprEvals, frameExcludeExprEvalsAndComparators.first,
+                        winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
+                        frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
                         projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
                         aggregatorOutputSchemaSize, nestedAggFactory);
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 9d5cdeb..d521831 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -120,9 +120,9 @@
 
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
-    public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
+    public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) throws AlgebricksException;
 
-    public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
+    public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) throws AlgebricksException;
 
     public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
new file mode 100644
index 0000000..9350f95
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.visitors;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+
+/**
+ * This visitor performs expression transformation on each operator by calling
+ * {@link ILogicalOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform)}.
+ * Subclasses can override individual {@code visit*} methods to customize which expressions must be transformed
+ * based on the operator kind. This functionality is required in cases when only a subset of operator's expressions
+ * must be transformed.
+ *
+ * @see WindowOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)
+ */
+public abstract class LogicalExpressionReferenceTransformVisitor
+        implements ILogicalOperatorVisitor<Boolean, ILogicalExpressionReferenceTransform> {
+
+    protected boolean visitOperator(ILogicalOperator op, ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return op.acceptExpressionTransform(transform);
+    }
+
+    @Override
+    public Boolean visitAggregateOperator(AggregateOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitGroupByOperator(GroupByOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitLimitOperator(LimitOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+            ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitOrderOperator(OrderOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitAssignOperator(AssignOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitSelectOperator(SelectOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitDelegateOperator(DelegateOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitProjectOperator(ProjectOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitSplitOperator(SplitOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitScriptOperator(ScriptOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitSubplanOperator(SubplanOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitSinkOperator(SinkOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitUnionOperator(UnionAllOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitIntersectOperator(IntersectOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitUnnestOperator(UnnestOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitUnnestMapOperator(UnnestMapOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op,
+            ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitDataScanOperator(DataSourceScanOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitDistinctOperator(DistinctOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitExchangeOperator(ExchangeOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitWriteOperator(WriteOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
+            ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
+            ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitForwardOperator(ForwardOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
+    @Override
+    public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
index dc9a11f..f072312 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
@@ -72,7 +72,7 @@
                 if (!op.requiresVariableReferenceExpressions()) {
                     inlineVisitor.setOperator(op);
                     inlineVisitor.setTargetVariable(entry.getKey());
-                    if (op.acceptExpressionTransform(inlineVisitor)) {
+                    if (op.accept(inlineVisitor, inlineVisitor)) {
                         modified = true;
                     }
                     inlineVisitor.setTargetVariable(null);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 729d6f9..2c95ce0 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -35,13 +35,14 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.LogicalExpressionReferenceTransformVisitor;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -113,7 +114,7 @@
         // Only inline variables in operators that can deal with arbitrary expressions.
         if (!op.requiresVariableReferenceExpressions()) {
             inlineVisitor.setOperator(op);
-            return op.acceptExpressionTransform(inlineVisitor);
+            return op.accept(inlineVisitor, inlineVisitor);
         }
         return false;
     }
@@ -199,7 +200,8 @@
         return modified;
     }
 
-    public static class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
+    public static class InlineVariablesVisitor extends LogicalExpressionReferenceTransformVisitor
+            implements ILogicalExpressionReferenceTransform {
 
         private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
         private final Set<LogicalVariable> liveVars = new HashSet<>();
@@ -227,9 +229,15 @@
         }
 
         @Override
+        public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg)
+                throws AlgebricksException {
+            return op.acceptExpressionTransform(arg, false);
+        }
+
+        @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
             ILogicalExpression e = exprRef.getValue();
-            switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+            switch (e.getExpressionTag()) {
                 case VARIABLE:
                     return transformVariableReferenceExpression(exprRef,
                             ((VariableReferenceExpression) e).getVariableReference());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index 661bb8a..4e97d6c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -99,9 +99,8 @@
         boolean isFirstChunk = chunkEndIdx.isEmpty();
         if (isFirstChunk) {
             if (frameId != curFrameId) {
-                int nBlocks = FrameHelper.deserializeNumOfMinFrame(frameBuffer);
-                curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks);
                 int pos = frameBuffer.position();
+                curFrame.ensureFrameSize(frameBuffer.capacity());
                 FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
                 frameBuffer.position(pos);
                 curFrameId = frameId;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index e7daf11..565cbe6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,15 +33,17 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.DataUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 /**
  * Runtime for window operators that performs partition materialization and can evaluate running aggregates
@@ -56,11 +57,11 @@
 
     private IScalarEvaluator[] frameValueEvals;
 
-    private IPointable[] frameValuePointables;
+    private PointableTupleReference frameValuePointables;
 
     private final IBinaryComparatorFactory[] frameValueComparatorFactories;
 
-    private IBinaryComparator[] frameValueComparators;
+    private MultiComparator frameValueComparators;
 
     private final boolean frameStartExists;
 
@@ -68,7 +69,9 @@
 
     private IScalarEvaluator[] frameStartEvals;
 
-    private IPointable[] frameStartPointables;
+    private PointableTupleReference frameStartPointables;
+
+    private final boolean frameStartIsMonotonic;
 
     private final boolean frameEndExists;
 
@@ -76,7 +79,7 @@
 
     private IScalarEvaluator[] frameEndEvals;
 
-    private IPointable[] frameEndPointables;
+    private PointableTupleReference frameEndPointables;
 
     private final boolean frameExcludeExists;
 
@@ -86,7 +89,7 @@
 
     private final int frameExcludeNegationStartIdx;
 
-    private IPointable[] frameExcludePointables;
+    private PointableTupleReference frameExcludePointables;
 
     private IPointable frameExcludePointable2;
 
@@ -116,18 +119,28 @@
 
     private IFrame runFrame;
 
+    private int runFrameChunkId;
+
+    private long runFrameSize;
+
     private FrameTupleAccessor tAccess2;
 
     private FrameTupleReference tRef2;
 
     private IBinaryIntegerInspector bii;
 
+    private int chunkIdxFrameStartGlobal;
+
+    private int tBeginIdxFrameStartGlobal;
+
+    private long readerPosFrameStartGlobal;
+
     WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
             IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
-            IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories,
-            int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories,
-            IScalarEvaluatorFactory frameOffsetEvalFactory,
+            boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+            IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
             IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
             int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
@@ -137,6 +150,7 @@
         this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
         this.frameStartEvalFactories = frameStartEvalFactories;
         this.frameStartExists = frameStartEvalFactories != null && frameStartEvalFactories.length > 0;
+        this.frameStartIsMonotonic = frameStartExists && frameStartIsMonotonic;
         this.frameEndEvalFactories = frameEndEvalFactories;
         this.frameEndExists = frameEndEvalFactories != null && frameEndEvalFactories.length > 0;
         this.frameValueComparatorFactories = frameValueComparatorFactories;
@@ -158,7 +172,7 @@
 
         if (frameValueExists) {
             frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
-            frameValueComparators = createBinaryComparators(frameValueComparatorFactories);
+            frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
             frameValuePointables = createPointables(frameValueEvalFactories.length);
         }
         if (frameStartExists) {
@@ -190,16 +204,26 @@
     }
 
     @Override
+    protected void beginPartitionImpl() throws HyracksDataException {
+        super.beginPartitionImpl();
+        chunkIdxFrameStartGlobal = -1;
+        tBeginIdxFrameStartGlobal = -1;
+        readerPosFrameStartGlobal = -1;
+        runFrameChunkId = -1;
+    }
+
+    @Override
     protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
+        boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
+
         long readerPos = -1;
         int nChunks = getPartitionChunkCount();
         if (nChunks > 1) {
             readerPos = reader.position();
             if (chunkIdx == 0) {
                 ByteBuffer curFrameBuffer = curFrame.getBuffer();
-                int nBlocks = FrameHelper.deserializeNumOfMinFrame(curFrameBuffer);
-                copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks);
                 int pos = curFrameBuffer.position();
+                copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
                 FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
                 curFrameBuffer.position(pos);
             }
@@ -216,19 +240,13 @@
 
             // frame boundaries
             if (frameStartExists) {
-                for (int i = 0; i < frameStartEvals.length; i++) {
-                    frameStartEvals[i].evaluate(tRef, frameStartPointables[i]);
-                }
+                evaluate(frameStartEvals, tRef, frameStartPointables);
             }
             if (frameEndExists) {
-                for (int i = 0; i < frameEndEvals.length; i++) {
-                    frameEndEvals[i].evaluate(tRef, frameEndPointables[i]);
-                }
+                evaluate(frameEndEvals, tRef, frameEndPointables);
             }
             if (frameExcludeExists) {
-                for (int i = 0; i < frameExcludeEvals.length; i++) {
-                    frameExcludeEvals[i].evaluate(tRef, frameExcludePointables[i]);
-                }
+                evaluate(frameExcludeEvals, tRef, frameExcludePointables);
             }
             int toSkip = 0;
             if (frameOffsetExists) {
@@ -241,37 +259,65 @@
             // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init()
             nestedAgg.init(null, null, -1, null);
 
+            int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
+            int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
             if (nChunks > 1) {
-                reader.seek(0);
+                reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0);
             }
 
-            frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks; chunkIdx2++) {
-                IFrame innerFrame;
-                if (chunkIdx2 == 0) {
-                    // first chunk's frame is always in memory
-                    innerFrame = chunkIdx == 0 ? curFrame : copyFrame2;
-                } else {
-                    reader.nextFrame(runFrame);
-                    innerFrame = runFrame;
-                }
-                tAccess2.reset(innerFrame.getBuffer());
+            int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
+            long readerPosFrameStartLocal = -1;
 
-                int tBeginIdx2 = getTupleBeginIdx(chunkIdx2);
-                int tEndIdx2 = getTupleEndIdx(chunkIdx2);
-                for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) {
-                    tRef2.reset(tAccess2, tIdx2);
+            frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
+                long readerPosFrameInner;
+                IFrame frameInner;
+                if (chunkIdxInner == 0) {
+                    // first chunk's frame is always in memory
+                    frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
+                    readerPosFrameInner = 0;
+                } else {
+                    readerPosFrameInner = reader.position();
+                    if (runFrameChunkId == chunkIdxInner) {
+                        // runFrame has this chunk, so just advance the reader
+                        reader.seek(readerPosFrameInner + runFrameSize);
+                    } else {
+                        reader.nextFrame(runFrame);
+                        runFrameSize = reader.position() - readerPosFrameInner;
+                        runFrameChunkId = chunkIdxInner;
+                    }
+                    frameInner = runFrame;
+                }
+                tAccess2.reset(frameInner.getBuffer());
+
+                int tBeginIdxInner;
+                if (tBeginIdxInnerStart < 0) {
+                    tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+                } else {
+                    tBeginIdxInner = tBeginIdxInnerStart;
+                    tBeginIdxInnerStart = -1;
+                }
+                int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+                for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
+                    tRef2.reset(tAccess2, tIdxInner);
 
                     if (frameStartExists || frameEndExists) {
-                        for (int frameValueIdx = 0; frameValueIdx < frameValueEvals.length; frameValueIdx++) {
-                            frameValueEvals[frameValueIdx].evaluate(tRef2, frameValuePointables[frameValueIdx]);
-                        }
-                        if (frameStartExists
-                                && compare(frameValuePointables, frameStartPointables, frameValueComparators) < 0) {
-                            // skip if value < start
-                            continue;
+                        evaluate(frameValueEvals, tRef2, frameValuePointables);
+                        if (frameStartExists) {
+                            if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
+                                // skip if value < start
+                                continue;
+                            }
+                            if (chunkIdxFrameStartLocal < 0) {
+                                // save position of the first tuple that matches the frame start.
+                                // we'll continue from it in the next frame iteration
+                                chunkIdxFrameStartLocal = chunkIdxInner;
+                                tBeginIdxFrameStartLocal = tIdxInner;
+                                readerPosFrameStartLocal = readerPosFrameInner;
+                            }
                         }
                         if (frameEndExists
-                                && compare(frameValuePointables, frameEndPointables, frameValueComparators) > 0) {
+                                && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
                             // skip and exit if value > end
                             break frame_loop;
                         }
@@ -288,7 +334,7 @@
                     }
 
                     if (toWrite != 0) {
-                        nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null);
+                        nestedAgg.aggregate(tAccess2, tIdxInner, null, -1, null);
                     }
                     if (toWrite > 0) {
                         toWrite--;
@@ -301,6 +347,19 @@
 
             nestedAgg.outputFinalResult(tupleBuilder, null, -1, null);
             appendToFrameFromTupleBuilder(tupleBuilder);
+
+            if (frameStartIsMonotonic) {
+                if (chunkIdxFrameStartLocal >= 0) {
+                    chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
+                    tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
+                    readerPosFrameStartGlobal = readerPosFrameStartLocal;
+                } else {
+                    // frame start not found, set start beyond the last chunk
+                    chunkIdxFrameStartGlobal = nChunks;
+                    tBeginIdxFrameStartGlobal = 0;
+                    readerPosFrameStartGlobal = 0;
+                }
+            }
         }
 
         if (nChunks > 1) {
@@ -311,7 +370,7 @@
     private boolean isExcluded() throws HyracksDataException {
         for (int i = 0; i < frameExcludeEvals.length; i++) {
             frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2);
-            boolean b = DataUtils.compare(frameExcludePointables[i], frameExcludePointable2,
+            boolean b = DataUtils.compare(frameExcludePointables.getField(i), frameExcludePointable2,
                     frameExcludeComparators[i]) != 0;
             if (i >= frameExcludeNegationStartIdx) {
                 b = !b;
@@ -337,22 +396,18 @@
         return evals;
     }
 
-    private static IPointable[] createPointables(int ln) {
+    private static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple,
+            PointableTupleReference outTuple) throws HyracksDataException {
+        for (int i = 0; i < evals.length; i++) {
+            evals[i].evaluate(inTuple, outTuple.getField(i));
+        }
+    }
+
+    private static PointableTupleReference createPointables(int ln) {
         IPointable[] pointables = new IPointable[ln];
         for (int i = 0; i < ln; i++) {
             pointables[i] = VoidPointable.FACTORY.createPointable();
         }
-        return pointables;
-    }
-
-    private static int compare(IValueReference[] first, IValueReference[] second, IBinaryComparator[] comparators)
-            throws HyracksDataException {
-        for (int i = 0; i < first.length; i++) {
-            int c = DataUtils.compare(first[i], second[i], comparators[i]);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
+        return new PointableTupleReference(pointables);
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index 640e260..16591d5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -40,6 +40,8 @@
 
     private final IScalarEvaluatorFactory[] frameStartEvalFactories;
 
+    private final boolean frameStartIsMonotonic;
+
     private final IScalarEvaluatorFactory[] frameEndEvalFactories;
 
     private final IBinaryComparatorFactory[] frameValueComparatorFactories;
@@ -64,9 +66,9 @@
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
             IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
-            IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories,
-            int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories,
-            IScalarEvaluatorFactory frameOffsetEvalFactory,
+            boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+            IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
             IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects,
             int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
@@ -75,6 +77,7 @@
                 projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
         this.frameValueEvalFactories = frameValueEvalFactories;
         this.frameStartEvalFactories = frameStartEvalFactories;
+        this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndEvalFactories = frameEndEvalFactories;
         this.frameValueComparatorFactories = frameValueComparatorFactories;
         this.frameExcludeEvalFactories = frameExcludeEvalFactories;
@@ -91,10 +94,10 @@
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
         return new WindowNestedPlansPushRuntime(partitionColumns, partitionComparatorFactories,
                 orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories,
-                frameStartEvalFactories, frameEndEvalFactories, frameExcludeEvalFactories, frameExcludeNegationStartIdx,
-                frameExcludeComparatorFactories, frameOffsetEvalFactory, binaryIntegerInspectorFactory, frameMaxObjects,
-                projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
-                ctx);
+                frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories,
+                frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory,
+                binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns,
+                runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
new file mode 100644
index 0000000..1d947c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.accessors;
+
+import org.apache.hyracks.data.std.api.IPointable;
+
+/**
+ * A tuple reference implementation that holds fields in a {@link IPointable} array
+ */
+public class PointableTupleReference implements ITupleReference {
+
+    private final IPointable[] fields;
+
+    public PointableTupleReference(IPointable[] fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return getField(fIdx).getByteArray();
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return getField(fIdx).getStartOffset();
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return getField(fIdx).getLength();
+    }
+
+    public IPointable getField(int fIdx) {
+        return fields[fIdx];
+    }
+}
\ No newline at end of file