[NO ISSUE][RT] Window operator runtime optimization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Runtime optimization for window operator with monotonic
frame start expression. In this case continue scanning
from the beginning of the frame that was found in the
previous iteration
- Allow inlining variables into window operator expressions
except PARTITION BY, ORDER BY and frame value expressions
Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3135
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 6749bf9..d1ce865 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
public class AnalysisUtil {
@@ -129,6 +130,39 @@
return new Pair<>(dataverseName, datasetName);
}
+ /**
+ * Checks whether frame boundary expression is a monotonically non-descreasing function over a frame value variable
+ */
+ public static boolean isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>> frameBoundaryExprList,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList) {
+ if (frameValueExprList.size() != 1) {
+ return false;
+ }
+ ILogicalExpression frameValueExpr = frameValueExprList.get(0).second.getValue();
+ if (frameValueExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ if (frameBoundaryExprList.size() != 1) {
+ return false;
+ }
+ ILogicalExpression frameStartExpr = frameBoundaryExprList.get(0).getValue();
+ switch (frameStartExpr.getExpressionTag()) {
+ case CONSTANT:
+ return true;
+ case VARIABLE:
+ return frameStartExpr.equals(frameValueExpr);
+ case FUNCTION_CALL:
+ AbstractFunctionCallExpression frameStartCallExpr = (AbstractFunctionCallExpression) frameStartExpr;
+ FunctionIdentifier fi = frameStartCallExpr.getFunctionIdentifier();
+ return (BuiltinFunctions.NUMERIC_ADD.equals(fi) || BuiltinFunctions.NUMERIC_SUBTRACT.equals(fi))
+ && frameStartCallExpr.getArguments().get(0).getValue().equals(frameValueExpr)
+ && frameStartCallExpr.getArguments().get(1).getValue()
+ .getExpressionTag() == LogicalExpressionTag.CONSTANT;
+ default:
+ throw new IllegalStateException(String.valueOf(frameStartExpr.getExpressionTag()));
+ }
+ }
+
private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
static {
@@ -136,5 +170,4 @@
fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE);
fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF);
}
-
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index cd75c1e..ce9fd03 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -31,6 +31,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.optimizer.base.AnalysisUtil;
import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
import org.apache.commons.lang3.mutable.Mutable;
@@ -380,7 +381,8 @@
}
}
}
-
- return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns);
+ boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
+ winOp.getFrameValueExpressions());
+ return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic);
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
new file mode 100644
index 0000000..47f2ce9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Tests runtime optimizations of window functions
+ * Expected Res : SUCCESS
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create function q1_sum_1_preceding_1_following(N) {
+ from
+ range(1, N) x
+ let
+ result_expected = 3 * x - (case x when N then x + 1 else 0 end),
+ result_actual = sum(x) over (order by x range between 1 preceding and 1 following),
+ result_delta = result_expected - result_actual
+ select
+ min(result_delta) min_delta,
+ max(result_delta) max_delta
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
new file mode 100644
index 0000000..2d561b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that fits into one physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10);
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
new file mode 100644
index 0000000..fec6158
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
new file mode 100644
index 0000000..84b5234
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on dataset that spans several physical frames with frame that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W preceding and W following)
+) v
+select value sum(v)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
new file mode 100644
index 0000000..8a4374f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * : with a frame that starts before current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W + 2 preceding and W preceding)
+) v
+select value sum(v)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
new file mode 100644
index 0000000..91c0a31
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * : with a frame that starts after current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W following and W + 2 following)
+) v
+select value sum(v)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
new file mode 100644
index 0000000..76a77d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
@@ -0,0 +1 @@
+375062502500
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
new file mode 100644
index 0000000..77dda1e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
@@ -0,0 +1 @@
+37492501
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
new file mode 100644
index 0000000..a25255a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
@@ -0,0 +1 @@
+112492496
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c8ac6a..455b3ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9311,6 +9311,11 @@
<output-dir compare="Text">win_opt_01</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="window">
+ <compilation-unit name="win_opt_02">
+ <output-dir compare="Text">win_opt_02</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="writers">
<test-case FilePath="writers">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 8f563b3..bdfdac8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -40,9 +40,12 @@
/**
* Window operator evaluates window functions. It has the following components:
* <ul>
- * <li>{@link #partitionExpressions} - define how input data must be partitioned</li>
- * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li>
- * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion</li>
+ * <li>{@link #partitionExpressions} - define how input data must be partitioned.
+ * Each must be a variable reference</li>
+ * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered.
+ * Each must be a variable reference</li>
+ * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion.
+ * Each must be a variable reference</li>
* <li>{@link #frameStartExpressions} - frame start boundary</li>
* <li>{@link #frameEndExpressions} - frame end boundary</li>
* <li>{@link #frameExcludeExpressions} - define values to be excluded from the frame</li>
@@ -217,15 +220,27 @@
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ return acceptExpressionTransform(visitor, true);
+ }
+
+ /**
+ * Allows performing expression transformation only on a subset of this operator's expressions
+ * @param visitor transforming visitor
+ * @param visitVarRefRequiringExprs whether to visit variable reference requiring expressions, or not
+ */
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor,
+ boolean visitVarRefRequiringExprs) throws AlgebricksException {
boolean mod = false;
- for (Mutable<ILogicalExpression> expr : partitionExpressions) {
- mod |= visitor.transform(expr);
- }
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
- mod |= visitor.transform(p.second);
- }
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) {
- mod |= visitor.transform(p.second);
+ if (visitVarRefRequiringExprs) {
+ for (Mutable<ILogicalExpression> expr : partitionExpressions) {
+ mod |= visitor.transform(expr);
+ }
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+ mod |= visitor.transform(p.second);
+ }
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) {
+ mod |= visitor.transform(p.second);
+ }
}
for (Mutable<ILogicalExpression> expr : frameStartExpressions) {
mod |= visitor.transform(expr);
@@ -305,4 +320,14 @@
expr.getValue().getUsedVariables(vars);
}
}
+
+ /**
+ * Only the following expressions require variable references: {@link #partitionExpressions},
+ * {@link #orderExpressions}, and {@link #frameValueExpressions}, others do not.
+ * Use {@link #acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)}
+ * to visit only non-requiring expressions.
+ */
+ public boolean requiresVariableReferenceExpressions() {
+ return false;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index c8168d1..2a8658d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -75,11 +75,14 @@
private final List<OrderColumn> orderColumns;
+ private final boolean frameStartIsMonotonic;
+
public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
- List<OrderColumn> orderColumns) {
+ List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) {
this.partitionColumns = partitionColumns;
this.partitionMaterialization = partitionMaterialization;
this.orderColumns = orderColumns;
+ this.frameStartIsMonotonic = frameStartIsMonotonic;
}
@Override
@@ -217,10 +220,10 @@
} else {
runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals,
- frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval,
- context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic,
+ frameEndExprEvals, frameExcludeExprEvalsAndComparators.first,
+ winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
+ frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
aggregatorOutputSchemaSize, nestedAggFactory);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 9d5cdeb..d521831 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -120,9 +120,9 @@
public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
- public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
+ public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) throws AlgebricksException;
- public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
+ public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) throws AlgebricksException;
public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
new file mode 100644
index 0000000..9350f95
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.visitors;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+
+/**
+ * This visitor performs expression transformation on each operator by calling
+ * {@link ILogicalOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform)}.
+ * Subclasses can override individual {@code visit*} methods to customize which expressions must be transformed
+ * based on the operator kind. This functionality is required in cases when only a subset of operator's expressions
+ * must be transformed.
+ *
+ * @see WindowOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)
+ */
+public abstract class LogicalExpressionReferenceTransformVisitor
+ implements ILogicalOperatorVisitor<Boolean, ILogicalExpressionReferenceTransform> {
+
+ protected boolean visitOperator(ILogicalOperator op, ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return op.acceptExpressionTransform(transform);
+ }
+
+ @Override
+ public Boolean visitAggregateOperator(AggregateOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitGroupByOperator(GroupByOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLimitOperator(LimitOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+ ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitOrderOperator(OrderOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitAssignOperator(AssignOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSelectOperator(SelectOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDelegateOperator(DelegateOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitProjectOperator(ProjectOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSplitOperator(SplitOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitScriptOperator(ScriptOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSubplanOperator(SubplanOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSinkOperator(SinkOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnionOperator(UnionAllOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitIntersectOperator(IntersectOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnnestOperator(UnnestOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnnestMapOperator(UnnestMapOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op,
+ ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDataScanOperator(DataSourceScanOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDistinctOperator(DistinctOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitExchangeOperator(ExchangeOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWriteOperator(WriteOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
+ ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
+ ILogicalExpressionReferenceTransform arg) throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitForwardOperator(ForwardOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
index dc9a11f..f072312 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
@@ -72,7 +72,7 @@
if (!op.requiresVariableReferenceExpressions()) {
inlineVisitor.setOperator(op);
inlineVisitor.setTargetVariable(entry.getKey());
- if (op.acceptExpressionTransform(inlineVisitor)) {
+ if (op.accept(inlineVisitor, inlineVisitor)) {
modified = true;
}
inlineVisitor.setTargetVariable(null);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 729d6f9..2c95ce0 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -35,13 +35,14 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.LogicalExpressionReferenceTransformVisitor;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -113,7 +114,7 @@
// Only inline variables in operators that can deal with arbitrary expressions.
if (!op.requiresVariableReferenceExpressions()) {
inlineVisitor.setOperator(op);
- return op.acceptExpressionTransform(inlineVisitor);
+ return op.accept(inlineVisitor, inlineVisitor);
}
return false;
}
@@ -199,7 +200,8 @@
return modified;
}
- public static class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
+ public static class InlineVariablesVisitor extends LogicalExpressionReferenceTransformVisitor
+ implements ILogicalExpressionReferenceTransform {
private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
private final Set<LogicalVariable> liveVars = new HashSet<>();
@@ -227,9 +229,15 @@
}
@Override
+ public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return op.acceptExpressionTransform(arg, false);
+ }
+
+ @Override
public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
ILogicalExpression e = exprRef.getValue();
- switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+ switch (e.getExpressionTag()) {
case VARIABLE:
return transformVariableReferenceExpression(exprRef,
((VariableReferenceExpression) e).getVariableReference());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index 661bb8a..4e97d6c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -99,9 +99,8 @@
boolean isFirstChunk = chunkEndIdx.isEmpty();
if (isFirstChunk) {
if (frameId != curFrameId) {
- int nBlocks = FrameHelper.deserializeNumOfMinFrame(frameBuffer);
- curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks);
int pos = frameBuffer.position();
+ curFrame.ensureFrameSize(frameBuffer.capacity());
FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
frameBuffer.position(pos);
curFrameId = frameId;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index e7daf11..565cbe6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,15 +33,17 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.DataUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import org.apache.hyracks.storage.common.MultiComparator;
/**
* Runtime for window operators that performs partition materialization and can evaluate running aggregates
@@ -56,11 +57,11 @@
private IScalarEvaluator[] frameValueEvals;
- private IPointable[] frameValuePointables;
+ private PointableTupleReference frameValuePointables;
private final IBinaryComparatorFactory[] frameValueComparatorFactories;
- private IBinaryComparator[] frameValueComparators;
+ private MultiComparator frameValueComparators;
private final boolean frameStartExists;
@@ -68,7 +69,9 @@
private IScalarEvaluator[] frameStartEvals;
- private IPointable[] frameStartPointables;
+ private PointableTupleReference frameStartPointables;
+
+ private final boolean frameStartIsMonotonic;
private final boolean frameEndExists;
@@ -76,7 +79,7 @@
private IScalarEvaluator[] frameEndEvals;
- private IPointable[] frameEndPointables;
+ private PointableTupleReference frameEndPointables;
private final boolean frameExcludeExists;
@@ -86,7 +89,7 @@
private final int frameExcludeNegationStartIdx;
- private IPointable[] frameExcludePointables;
+ private PointableTupleReference frameExcludePointables;
private IPointable frameExcludePointable2;
@@ -116,18 +119,28 @@
private IFrame runFrame;
+ private int runFrameChunkId;
+
+ private long runFrameSize;
+
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
private IBinaryIntegerInspector bii;
+ private int chunkIdxFrameStartGlobal;
+
+ private int tBeginIdxFrameStartGlobal;
+
+ private long readerPosFrameStartGlobal;
+
WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
- IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories,
- int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories,
- IScalarEvaluatorFactory frameOffsetEvalFactory,
+ boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
@@ -137,6 +150,7 @@
this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
this.frameStartEvalFactories = frameStartEvalFactories;
this.frameStartExists = frameStartEvalFactories != null && frameStartEvalFactories.length > 0;
+ this.frameStartIsMonotonic = frameStartExists && frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameEndExists = frameEndEvalFactories != null && frameEndEvalFactories.length > 0;
this.frameValueComparatorFactories = frameValueComparatorFactories;
@@ -158,7 +172,7 @@
if (frameValueExists) {
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
- frameValueComparators = createBinaryComparators(frameValueComparatorFactories);
+ frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
frameValuePointables = createPointables(frameValueEvalFactories.length);
}
if (frameStartExists) {
@@ -190,16 +204,26 @@
}
@Override
+ protected void beginPartitionImpl() throws HyracksDataException {
+ super.beginPartitionImpl();
+ chunkIdxFrameStartGlobal = -1;
+ tBeginIdxFrameStartGlobal = -1;
+ readerPosFrameStartGlobal = -1;
+ runFrameChunkId = -1;
+ }
+
+ @Override
protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
+ boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
+
long readerPos = -1;
int nChunks = getPartitionChunkCount();
if (nChunks > 1) {
readerPos = reader.position();
if (chunkIdx == 0) {
ByteBuffer curFrameBuffer = curFrame.getBuffer();
- int nBlocks = FrameHelper.deserializeNumOfMinFrame(curFrameBuffer);
- copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks);
int pos = curFrameBuffer.position();
+ copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
curFrameBuffer.position(pos);
}
@@ -216,19 +240,13 @@
// frame boundaries
if (frameStartExists) {
- for (int i = 0; i < frameStartEvals.length; i++) {
- frameStartEvals[i].evaluate(tRef, frameStartPointables[i]);
- }
+ evaluate(frameStartEvals, tRef, frameStartPointables);
}
if (frameEndExists) {
- for (int i = 0; i < frameEndEvals.length; i++) {
- frameEndEvals[i].evaluate(tRef, frameEndPointables[i]);
- }
+ evaluate(frameEndEvals, tRef, frameEndPointables);
}
if (frameExcludeExists) {
- for (int i = 0; i < frameExcludeEvals.length; i++) {
- frameExcludeEvals[i].evaluate(tRef, frameExcludePointables[i]);
- }
+ evaluate(frameExcludeEvals, tRef, frameExcludePointables);
}
int toSkip = 0;
if (frameOffsetExists) {
@@ -241,37 +259,65 @@
// aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init()
nestedAgg.init(null, null, -1, null);
+ int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
+ int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
if (nChunks > 1) {
- reader.seek(0);
+ reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0);
}
- frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks; chunkIdx2++) {
- IFrame innerFrame;
- if (chunkIdx2 == 0) {
- // first chunk's frame is always in memory
- innerFrame = chunkIdx == 0 ? curFrame : copyFrame2;
- } else {
- reader.nextFrame(runFrame);
- innerFrame = runFrame;
- }
- tAccess2.reset(innerFrame.getBuffer());
+ int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
+ long readerPosFrameStartLocal = -1;
- int tBeginIdx2 = getTupleBeginIdx(chunkIdx2);
- int tEndIdx2 = getTupleEndIdx(chunkIdx2);
- for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) {
- tRef2.reset(tAccess2, tIdx2);
+ frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
+ long readerPosFrameInner;
+ IFrame frameInner;
+ if (chunkIdxInner == 0) {
+ // first chunk's frame is always in memory
+ frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
+ readerPosFrameInner = 0;
+ } else {
+ readerPosFrameInner = reader.position();
+ if (runFrameChunkId == chunkIdxInner) {
+ // runFrame has this chunk, so just advance the reader
+ reader.seek(readerPosFrameInner + runFrameSize);
+ } else {
+ reader.nextFrame(runFrame);
+ runFrameSize = reader.position() - readerPosFrameInner;
+ runFrameChunkId = chunkIdxInner;
+ }
+ frameInner = runFrame;
+ }
+ tAccess2.reset(frameInner.getBuffer());
+
+ int tBeginIdxInner;
+ if (tBeginIdxInnerStart < 0) {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ } else {
+ tBeginIdxInner = tBeginIdxInnerStart;
+ tBeginIdxInnerStart = -1;
+ }
+ int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+ for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
+ tRef2.reset(tAccess2, tIdxInner);
if (frameStartExists || frameEndExists) {
- for (int frameValueIdx = 0; frameValueIdx < frameValueEvals.length; frameValueIdx++) {
- frameValueEvals[frameValueIdx].evaluate(tRef2, frameValuePointables[frameValueIdx]);
- }
- if (frameStartExists
- && compare(frameValuePointables, frameStartPointables, frameValueComparators) < 0) {
- // skip if value < start
- continue;
+ evaluate(frameValueEvals, tRef2, frameValuePointables);
+ if (frameStartExists) {
+ if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
+ // skip if value < start
+ continue;
+ }
+ if (chunkIdxFrameStartLocal < 0) {
+ // save position of the first tuple that matches the frame start.
+ // we'll continue from it in the next frame iteration
+ chunkIdxFrameStartLocal = chunkIdxInner;
+ tBeginIdxFrameStartLocal = tIdxInner;
+ readerPosFrameStartLocal = readerPosFrameInner;
+ }
}
if (frameEndExists
- && compare(frameValuePointables, frameEndPointables, frameValueComparators) > 0) {
+ && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
// skip and exit if value > end
break frame_loop;
}
@@ -288,7 +334,7 @@
}
if (toWrite != 0) {
- nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null);
+ nestedAgg.aggregate(tAccess2, tIdxInner, null, -1, null);
}
if (toWrite > 0) {
toWrite--;
@@ -301,6 +347,19 @@
nestedAgg.outputFinalResult(tupleBuilder, null, -1, null);
appendToFrameFromTupleBuilder(tupleBuilder);
+
+ if (frameStartIsMonotonic) {
+ if (chunkIdxFrameStartLocal >= 0) {
+ chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
+ tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
+ readerPosFrameStartGlobal = readerPosFrameStartLocal;
+ } else {
+ // frame start not found, set start beyond the last chunk
+ chunkIdxFrameStartGlobal = nChunks;
+ tBeginIdxFrameStartGlobal = 0;
+ readerPosFrameStartGlobal = 0;
+ }
+ }
}
if (nChunks > 1) {
@@ -311,7 +370,7 @@
private boolean isExcluded() throws HyracksDataException {
for (int i = 0; i < frameExcludeEvals.length; i++) {
frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2);
- boolean b = DataUtils.compare(frameExcludePointables[i], frameExcludePointable2,
+ boolean b = DataUtils.compare(frameExcludePointables.getField(i), frameExcludePointable2,
frameExcludeComparators[i]) != 0;
if (i >= frameExcludeNegationStartIdx) {
b = !b;
@@ -337,22 +396,18 @@
return evals;
}
- private static IPointable[] createPointables(int ln) {
+ private static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple,
+ PointableTupleReference outTuple) throws HyracksDataException {
+ for (int i = 0; i < evals.length; i++) {
+ evals[i].evaluate(inTuple, outTuple.getField(i));
+ }
+ }
+
+ private static PointableTupleReference createPointables(int ln) {
IPointable[] pointables = new IPointable[ln];
for (int i = 0; i < ln; i++) {
pointables[i] = VoidPointable.FACTORY.createPointable();
}
- return pointables;
- }
-
- private static int compare(IValueReference[] first, IValueReference[] second, IBinaryComparator[] comparators)
- throws HyracksDataException {
- for (int i = 0; i < first.length; i++) {
- int c = DataUtils.compare(first[i], second[i], comparators[i]);
- if (c != 0) {
- return c;
- }
- }
- return 0;
+ return new PointableTupleReference(pointables);
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index 640e260..16591d5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -40,6 +40,8 @@
private final IScalarEvaluatorFactory[] frameStartEvalFactories;
+ private final boolean frameStartIsMonotonic;
+
private final IScalarEvaluatorFactory[] frameEndEvalFactories;
private final IBinaryComparatorFactory[] frameValueComparatorFactories;
@@ -64,9 +66,9 @@
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
- IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories,
- int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories,
- IScalarEvaluatorFactory frameOffsetEvalFactory,
+ boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects,
int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
@@ -75,6 +77,7 @@
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameStartEvalFactories = frameStartEvalFactories;
+ this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameExcludeEvalFactories = frameExcludeEvalFactories;
@@ -91,10 +94,10 @@
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowNestedPlansPushRuntime(partitionColumns, partitionComparatorFactories,
orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories,
- frameStartEvalFactories, frameEndEvalFactories, frameExcludeEvalFactories, frameExcludeNegationStartIdx,
- frameExcludeComparatorFactories, frameOffsetEvalFactory, binaryIntegerInspectorFactory, frameMaxObjects,
- projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
- ctx);
+ frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories,
+ frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory,
+ binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns,
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
new file mode 100644
index 0000000..1d947c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.accessors;
+
+import org.apache.hyracks.data.std.api.IPointable;
+
+/**
+ * A tuple reference implementation that holds fields in a {@link IPointable} array
+ */
+public class PointableTupleReference implements ITupleReference {
+
+ private final IPointable[] fields;
+
+ public PointableTupleReference(IPointable[] fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fields.length;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return getField(fIdx).getByteArray();
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return getField(fIdx).getStartOffset();
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return getField(fIdx).getLength();
+ }
+
+ public IPointable getField(int fIdx) {
+ return fields[fIdx];
+ }
+}
\ No newline at end of file