[NO ISSUE][COMP] Window operator runtime optimization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add optimized runtime for window operator that computes
nested aggregates over the whole partition (unbounded frame)
- Do not generate unnecessary expressions when compiling
window operator with unbounded frame
Change-Id: If34d8eb05c069257c974f61810bee399136825fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3124
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index b39136f..ce857ea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -1202,8 +1202,8 @@
LogicalVariable denseRankVar = context.newVar();
ListSet<LogicalVariable> usedVars = new ListSet<>();
- frameValueExprRefs = translateWindowFrameMode(winFrameMode, orderExprListOut, rowNumVar, denseRankVar,
- usedVars, sourceLoc);
+ frameValueExprRefs = translateWindowFrameMode(winFrameMode, winFrameStartKind, winFrameEndKind,
+ orderExprListOut, rowNumVar, denseRankVar, usedVars, sourceLoc);
Pair<List<Mutable<ILogicalExpression>>, Integer> frameExclusionResult =
translateWindowExclusion(winFrameExclusionKind, rowNumVar, denseRankVar, usedVars, sourceLoc);
@@ -1223,16 +1223,16 @@
currentOpRef = new MutableObject<>(helperWinOp);
}
- Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameStartResult =
- translateWindowBoundary(winFrameStartKind, winFrameStartExpr, frameValueExprRefs, currentOpRef);
+ Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameStartResult = translateWindowBoundary(
+ winFrameStartKind, winFrameStartExpr, frameValueExprRefs, orderExprListOut, currentOpRef);
if (frameStartResult != null) {
frameStartExprRefs = frameStartResult.first;
if (frameStartResult.second != null) {
currentOpRef = new MutableObject<>(frameStartResult.second);
}
}
- Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameEndResult =
- translateWindowBoundary(winFrameEndKind, winFrameEndExpr, frameValueExprRefs, currentOpRef);
+ Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameEndResult = translateWindowBoundary(
+ winFrameEndKind, winFrameEndExpr, frameValueExprRefs, orderExprListOut, currentOpRef);
if (frameEndResult != null) {
frameEndExprRefs = frameEndResult.first;
if (frameEndResult.second != null) {
@@ -1377,10 +1377,17 @@
}
private List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> translateWindowFrameMode(
- WindowExpression.FrameMode frameMode,
+ WindowExpression.FrameMode frameMode, WindowExpression.FrameBoundaryKind winFrameStartKind,
+ WindowExpression.FrameBoundaryKind winFrameEndKind,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList, LogicalVariable rowNumVar,
LogicalVariable denseRankVar, Set<LogicalVariable> outUsedVars, SourceLocation sourceLoc)
throws CompilationException {
+ // if the frame is unbounded then no need to generate the frame value expression
+ // because it will not be used
+ if (winFrameStartKind == WindowExpression.FrameBoundaryKind.UNBOUNDED_PRECEDING
+ && winFrameEndKind == WindowExpression.FrameBoundaryKind.UNBOUNDED_FOLLOWING) {
+ return Collections.emptyList();
+ }
switch (frameMode) {
case RANGE:
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> result =
@@ -1454,7 +1461,13 @@
private Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> translateWindowBoundary(
WindowExpression.FrameBoundaryKind boundaryKind, Expression boundaryExpr,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> valueExprs,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList,
Mutable<ILogicalOperator> tupSource) throws CompilationException {
+ // if no ORDER BY in window specification then all rows are considered peers,
+ // so the frame becomes unbounded
+ if (orderExprList.isEmpty()) {
+ return null;
+ }
switch (boundaryKind) {
case CURRENT_ROW:
List<Mutable<ILogicalExpression>> resultExprs = new ArrayList<>(valueExprs.size());
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_01/win_opt_01_9.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_01/win_opt_01_9.sqlpp
new file mode 100644
index 0000000..6ba77bf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_01/win_opt_01_9.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test that helper window operator is optimized away
+ * : if the frame is unbounded
+ * Expected Res : SUCCESS (one window operator in the optimized plan)
+ */
+
+FROM range(1, 10) x
+SELECT x,
+ sum(x) OVER (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `sum`
+ORDER BY x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_9.plan
new file mode 100644
index 0000000..ac920c2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_9.plan
@@ -0,0 +1,14 @@
+-- DISTRIBUTE_RESULT |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- WINDOW |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$x(ASC)] |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_01/win_opt_01.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_01/win_opt_01.9.query.sqlpp
new file mode 100644
index 0000000..a89bfb4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_01/win_opt_01.9.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test that helper window operator is optimized away
+ * : if the frame is unbounded
+ * Expected Res : SUCCESS (one window operator in the optimized plan)
+ */
+
+FROM range(1, 10) x
+SELECT x,
+ sum(x) OVER (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `sum`
+ORDER BY x
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_01/win_opt_01.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_01/win_opt_01.9.adm
new file mode 100644
index 0000000..3c62f1d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_01/win_opt_01.9.adm
@@ -0,0 +1,10 @@
+{ "x": 1, "sum": 55 }
+{ "x": 2, "sum": 55 }
+{ "x": 3, "sum": 55 }
+{ "x": 4, "sum": 55 }
+{ "x": 5, "sum": 55 }
+{ "x": 6, "sum": 55 }
+{ "x": 7, "sum": 55 }
+{ "x": 8, "sum": 55 }
+{ "x": 9, "sum": 55 }
+{ "x": 10, "sum": 55 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 7389d8e..c8168d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowMaterializingRuntimeFactory;
@@ -158,20 +159,24 @@
IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
- IScalarEvaluatorFactory[] frameStartExprEvals = createEvaluatorFactories(winOp.getFrameStartExpressions(),
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+ List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
+ IScalarEvaluatorFactory[] frameStartExprEvals =
+ createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
- IScalarEvaluatorFactory[] frameEndExprEvals = createEvaluatorFactories(winOp.getFrameEndExpressions(),
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+ List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
+ IScalarEvaluatorFactory[] frameEndExprEvals =
+ createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
+ winOp.getFrameValueExpressions();
Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(winOp.getFrameValueExpressions(), Pair::getSecond, Pair::getFirst,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+ createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
+ inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+ List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(winOp.getFrameExcludeExpressions(), v -> v,
- v -> OrderOperator.ASC_ORDER, inputSchemas, inputTypeEnv, exprRuntimeProvider,
- binaryComparatorFactoryProvider, context);
+ createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
IScalarEvaluatorFactory frameOffsetExprEval = null;
ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
@@ -201,14 +206,24 @@
int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
- runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals,
- frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval,
- context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory);
+
+ boolean useUnboundedRuntime = frameStartExprList.isEmpty() && frameEndExprList.isEmpty()
+ && frameExcludeExprList.isEmpty() && frameOffsetExprEval == null;
+ if (useUnboundedRuntime) {
+ runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories, winOp.getFrameMaxObjects(),
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
+ aggregatorOutputSchemaSize, nestedAggFactory);
+ } else {
+ runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvalsAndComparators.first,
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals,
+ frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
+ frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval,
+ context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
+ aggregatorOutputSchemaSize, nestedAggFactory);
+ }
} else if (partitionMaterialization) {
runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index ec13c5f..661bb8a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -84,7 +84,7 @@
}
@Override
- protected void beginPartitionImpl() {
+ protected void beginPartitionImpl() throws HyracksDataException {
chunkEndIdx.clear();
partitionLength = 0;
if (run != null) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
new file mode 100644
index 0000000..4ceda1e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over <b>unbounded</b> window frames.
+ * An unbounded frame is equivalent to the whole partition, so nested aggregates can only
+ * be evaluated once per partition and their results returned for each row in the partition
+ * (the result remains the same for each row).
+ * <p>
+ * In addition to the unbounded frame specification the following conditions must be met:
+ * <ul>
+ * <li>no frame exclusion</li>
+ * <li>no frame offset</li>
+ * </ul>
+ */
+public class WindowNestedPlansUnboundedPushRuntime extends WindowMaterializingPushRuntime {
+
+ private final int nestedAggOutSchemaSize;
+
+ private final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+ private IAggregatorDescriptor nestedAgg;
+
+ private ArrayTupleBuilder nestedAggOutputBuilder;
+
+ private final int frameMaxObjects;
+
+ private int toWrite;
+
+ WindowNestedPlansUnboundedPushRuntime(int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects, int[] projectionColumns,
+ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
+ runningAggOutColumns, runningAggFactories, ctx);
+ this.frameMaxObjects = frameMaxObjects;
+ this.nestedAggFactory = nestedAggFactory;
+ this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+ }
+
+ @Override
+ protected void init() throws HyracksDataException {
+ super.init();
+ nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1);
+ nestedAggOutputBuilder = new ArrayTupleBuilder(nestedAggOutSchemaSize);
+ }
+
+ @Override
+ protected void beginPartitionImpl() throws HyracksDataException {
+ super.beginPartitionImpl();
+ // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init()
+ nestedAgg.init(null, null, -1, null);
+ nestedAggOutputBuilder.reset();
+ toWrite = frameMaxObjects;
+ }
+
+ @Override
+ protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx)
+ throws HyracksDataException {
+ super.partitionChunkImpl(frameId, frameBuffer, tBeginIdx, tEndIdx);
+ tAccess.reset(frameBuffer);
+ for (int t = tBeginIdx; t <= tEndIdx && toWrite != 0; t++) {
+ nestedAgg.aggregate(tAccess, t, null, -1, null);
+ if (toWrite > 0) {
+ toWrite--;
+ }
+ }
+ }
+
+ @Override
+ protected void endPartitionImpl() throws HyracksDataException {
+ nestedAgg.outputFinalResult(nestedAggOutputBuilder, null, -1, null);
+ super.endPartitionImpl();
+ }
+
+ @Override
+ protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ super.produceTuple(tb, accessor, tIndex, tupleRef);
+ TupleUtils.addFields(nestedAggOutputBuilder, tb);
+ }
+
+ @Override
+ protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) {
+ return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
new file mode 100644
index 0000000..f89a8e5
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over <b>unbounded</b> window frames.
+ */
+public final class WindowNestedPlansUnboundedRuntimeFactory extends AbstractWindowRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int frameMaxObjects;
+
+ private final int nestedAggOutSchemaSize;
+
+ private final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+ public WindowNestedPlansUnboundedRuntimeFactory(int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ this.frameMaxObjects = frameMaxObjects;
+ this.nestedAggFactory = nestedAggFactory;
+ this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+ return new WindowNestedPlansUnboundedPushRuntime(partitionColumns, partitionComparatorFactories,
+ orderComparatorFactories, frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories,
+ nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ }
+
+ @Override
+ public String toString() {
+ return "window [nested-unbounded] (" + Arrays.toString(partitionColumns) + ") "
+ + Arrays.toString(runningAggOutColumns) + " := " + Arrays.toString(runningAggFactories);
+ }
+}