[NO ISSUE][RT] Window operator runtime optimization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Runtime optimization for window operators with accumulating
frames (unbounded preceding to current row or n following)
- Refactor window function properties into generic
builtin function properties
Change-Id: I8d1574defc73076ad960c4067432da29ead160a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3151
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index d1ce865..9c476e5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.optimizer.rules.am.AccessMethodUtils;
@@ -29,6 +31,7 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -37,6 +40,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
public class AnalysisUtil {
/*
@@ -131,6 +135,25 @@
}
/**
+ * Checks whether a window operator has a function call where the function has given property
+ */
+ public static boolean hasFunctionWithProperty(WindowOperator winOp,
+ BuiltinFunctions.WindowFunctionProperty property) throws CompilationException {
+ for (Mutable<ILogicalExpression> exprRef : winOp.getExpressions()) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
+ expr.getExpressionTag());
+ }
+ AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
+ if (BuiltinFunctions.builtinFunctionHasProperty(callExpr.getFunctionIdentifier(), property)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Checks whether frame boundary expression is a monotonically non-descreasing function over a frame value variable
*/
public static boolean isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>> frameBoundaryExprList,
@@ -163,6 +186,22 @@
}
}
+ public static boolean isTrivialAggregateSubplan(ILogicalPlan subplan) {
+ if (subplan.getRoots().isEmpty()) {
+ return false;
+ }
+ for (Mutable<ILogicalOperator> rootOpRef : subplan.getRoots()) {
+ ILogicalOperator rootOp = rootOpRef.getValue();
+ if (rootOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ if (firstChildOfType((AbstractLogicalOperator) rootOp, LogicalOperatorTag.NESTEDTUPLESOURCE) == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
static {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index ce9fd03..a1f819d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator;
@@ -67,6 +68,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
@@ -365,24 +367,17 @@
LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference();
orderColumns.add(new OrderColumn(var, p.first.getKind()));
}
- boolean partitionMaterialization = winOp.hasNestedPlans();
- if (!partitionMaterialization) {
- for (Mutable<ILogicalExpression> exprRef : winOp.getExpressions()) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
- throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(),
- expr.getExpressionTag());
- }
- AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr;
- if (BuiltinFunctions.windowFunctionHasProperty(callExpr.getFunctionIdentifier(),
- BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
- partitionMaterialization = true;
- break;
- }
- }
- }
+
+ boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
+ BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
winOp.getFrameValueExpressions());
- return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic);
+ boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
+ winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates = winOp.hasNestedPlans()
+ && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+
+ return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic,
+ frameEndIsMonotonic, nestedTrivialAggregates);
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index ce857ea..81f054a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -1040,11 +1040,11 @@
fs.getName());
}
boolean isWin = BuiltinFunctions.isWindowFunction(fi);
- boolean isWinAgg = isWin
- && BuiltinFunctions.windowFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG);
- boolean prohibitOrderClause = isWin && BuiltinFunctions.windowFunctionHasProperty(fi,
+ boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
+ BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG);
+ boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE);
- boolean prohibitFrameClause = isWin && BuiltinFunctions.windowFunctionHasProperty(fi,
+ boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi,
BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE);
Mutable<ILogicalOperator> currentOpRef = tupSource;
@@ -1313,7 +1313,7 @@
if (fcallExpr.getKind() != AbstractFunctionCallExpression.FunctionKind.STATEFUL) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, fcallExpr.getKind());
}
- if (BuiltinFunctions.windowFunctionHasProperty(fi,
+ if (BuiltinFunctions.builtinFunctionHasProperty(fi,
BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS)) {
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprListOut) {
fcallExpr.getArguments().add(new MutableObject<>(p.second.getValue().cloneExpression()));
@@ -1575,7 +1575,7 @@
}
AbstractFunctionCallExpression valueExpr =
BuiltinFunctions.makeWindowFunctionExpression(fid, new ArrayList<>());
- if (BuiltinFunctions.windowFunctionHasProperty(valueExpr.getFunctionIdentifier(),
+ if (BuiltinFunctions.builtinFunctionHasProperty(valueExpr.getFunctionIdentifier(),
BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS)) {
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprList) {
valueExpr.getArguments().add(new MutableObject<>(p.second.getValue().cloneExpression()));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
index 47f2ce9..31228c5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
@@ -37,3 +37,17 @@
min(result_delta) min_delta,
max(result_delta) max_delta
};
+
+create function q2_max_unbounded_preceding_n_following(N) {
+ let
+ DBL_N = N * 2
+ from
+ range(1, DBL_N) x
+ let
+ result_expected = case when x > N then DBL_N else x + N end,
+ result_actual = max(x) over (order by x range between unbounded preceding and N following),
+ result_delta = result_expected - result_actual
+ select
+ min(result_delta) min_delta,
+ max(result_delta) max_delta
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp
new file mode 100644
index 0000000..fab9e42
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with accumulating frame
+ * : that always covers the whole partition
+ * Expected Res : SUCCESS
+ */
+
+from range(1, 10) t
+select t, sum(t) over(order by t range between unbounded preceding and 1000 following) as `sum`
+order by t
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp
new file mode 100644
index 0000000..bf21dc3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with accumulating frame
+ * : and window function with row limit
+ * Expected Res : SUCCESS
+ */
+
+from range(1, 10) t
+select t, first_value(t) over(order by t) as first
+order by t
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
new file mode 100644
index 0000000..ad6913c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with accumulating frame
+ * : that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q2_max_unbounded_preceding_n_following(5000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm
new file mode 100644
index 0000000..5f3b610
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm
@@ -0,0 +1,10 @@
+{ "t": 1, "sum": 55 }
+{ "t": 2, "sum": 55 }
+{ "t": 3, "sum": 55 }
+{ "t": 4, "sum": 55 }
+{ "t": 5, "sum": 55 }
+{ "t": 6, "sum": 55 }
+{ "t": 7, "sum": 55 }
+{ "t": 8, "sum": 55 }
+{ "t": 9, "sum": 55 }
+{ "t": 10, "sum": 55 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm
new file mode 100644
index 0000000..d6b51d1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm
@@ -0,0 +1,10 @@
+{ "t": 1, "first": 1 }
+{ "t": 2, "first": 1 }
+{ "t": 3, "first": 1 }
+{ "t": 4, "first": 1 }
+{ "t": 5, "first": 1 }
+{ "t": 6, "first": 1 }
+{ "t": 7, "first": 1 }
+{ "t": 8, "first": 1 }
+{ "t": 9, "first": 1 }
+{ "t": 10, "first": 1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java
index a060d57..777f6db 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java
@@ -88,7 +88,7 @@
if (winfi != null) {
winExpr.setFunctionSignature(new FunctionSignature(winfi));
rewriteSpecificWindowFunctions(winfi, winExpr);
- if (BuiltinFunctions.windowFunctionHasProperty(winfi,
+ if (BuiltinFunctions.builtinFunctionHasProperty(winfi,
BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) {
wrapAggregationArguments(winExpr, 1);
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java
index 3c5032a..42a4282 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java
@@ -64,7 +64,7 @@
FunctionSignature signature = winExpr.getFunctionSignature();
FunctionIdentifier winfi = FunctionMapUtil.getInternalWindowFunction(signature);
if (winfi != null) {
- if (BuiltinFunctions.windowFunctionHasProperty(winfi,
+ if (BuiltinFunctions.builtinFunctionHasProperty(winfi,
BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) {
List<Expression> newExprList = extractExpressions(winExpr.getExprList(), 1);
if (newExprList == null) {
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
index 2eddfb5..50b13e2 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
@@ -219,7 +219,7 @@
FunctionSignature fs = winExpr.getFunctionSignature();
FunctionIdentifier winfi = FunctionMapUtil.getInternalWindowFunction(fs);
if (winfi != null) {
- if (BuiltinFunctions.windowFunctionHasProperty(winfi,
+ if (BuiltinFunctions.builtinFunctionHasProperty(winfi,
BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) {
visitWindowExpressionExcludingExprList(winExpr, arg);
List<Expression> exprList = winExpr.getExprList();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 20fbf63..623cf08 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -157,6 +157,8 @@
private static final Map<IFunctionInfo, IFunctionInfo> builtinPublicFunctionsSet = new HashMap<>();
private static final Map<IFunctionInfo, IFunctionInfo> builtinPrivateFunctionsSet = new HashMap<>();
private static final Map<IFunctionInfo, IResultTypeComputer> funTypeComputer = new HashMap<>();
+ private static final Map<IFunctionInfo, Set<? extends BuiltinFunctionProperty>> builtinFunctionProperties =
+ new HashMap<>();
private static final Set<IFunctionInfo> builtinAggregateFunctions = new HashSet<>();
private static final Map<IFunctionInfo, IFunctionToDataSourceRewriter> datasourceFunctions = new HashMap<>();
private static final Set<IFunctionInfo> similarityFunctions = new HashSet<>();
@@ -170,7 +172,7 @@
private static final Map<IFunctionInfo, IFunctionInfo> distinctToRegularScalarAggregateFunctionMap =
new HashMap<>();
private static final Map<IFunctionInfo, IFunctionInfo> sqlToWindowFunctions = new HashMap<>();
- private static final Map<IFunctionInfo, Set<WindowFunctionProperty>> windowFunctions = new HashMap<>();
+ private static final Set<IFunctionInfo> windowFunctions = new HashSet<>();
private static final Map<IFunctionInfo, SpatialFilterKind> spatialFilterFunctions = new HashMap<>();
@@ -2629,7 +2631,7 @@
addIntermediateAgg(SERIAL_GLOBAL_SQL_SUM, SERIAL_INTERMEDIATE_SQL_SUM);
addGlobalAgg(SERIAL_SQL_SUM, SERIAL_GLOBAL_SQL_SUM);
- // SQL SUM Distinct
+ // SQL SUM DISTINCT
addDistinctAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM);
addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT);
@@ -2641,7 +2643,10 @@
addGlobalAgg(ST_UNION_AGG, ST_UNION_AGG);
}
- public enum WindowFunctionProperty {
+ interface BuiltinFunctionProperty {
+ }
+
+ public enum WindowFunctionProperty implements BuiltinFunctionProperty {
/** Whether the order clause is prohibited */
NO_ORDER_CLAUSE,
/** Whether the frame clause is prohibited */
@@ -2818,6 +2823,21 @@
registeredFunctions.put(fi, functionInfo);
}
+ private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(IFunctionInfo finfo,
+ Class<T> propertyClass, T[] properties) {
+ if (properties == null) {
+ return;
+ }
+ Set<T> propertySet = EnumSet.noneOf(propertyClass);
+ Collections.addAll(propertySet, properties);
+ builtinFunctionProperties.put(finfo, propertySet);
+ }
+
+ public static boolean builtinFunctionHasProperty(FunctionIdentifier fi, BuiltinFunctionProperty property) {
+ Set<? extends BuiltinFunctionProperty> propertySet = builtinFunctionProperties.get(getAsterixFunctionInfo(fi));
+ return propertySet != null && propertySet.contains(property);
+ }
+
public static void addAgg(FunctionIdentifier fi) {
builtinAggregateFunctions.add(getAsterixFunctionInfo(fi));
}
@@ -2856,10 +2876,9 @@
WindowFunctionProperty... properties) {
IFunctionInfo sqlinfo = getAsterixFunctionInfo(sqlfi);
IFunctionInfo wininfo = getAsterixFunctionInfo(winfi);
- Set<WindowFunctionProperty> propertiesSet = EnumSet.noneOf(WindowFunctionProperty.class);
- Collections.addAll(propertiesSet, properties);
sqlToWindowFunctions.put(sqlinfo, wininfo);
- windowFunctions.put(wininfo, propertiesSet);
+ windowFunctions.add(wininfo);
+ registerFunctionProperties(wininfo, WindowFunctionProperty.class, properties);
}
public static FunctionIdentifier getWindowFunction(FunctionIdentifier sqlfi) {
@@ -2868,12 +2887,7 @@
}
public static boolean isWindowFunction(FunctionIdentifier winfi) {
- return windowFunctions.containsKey(getAsterixFunctionInfo(winfi));
- }
-
- public static boolean windowFunctionHasProperty(FunctionIdentifier winfi, WindowFunctionProperty property) {
- Set<WindowFunctionProperty> propertySet = windowFunctions.get(getAsterixFunctionInfo(winfi));
- return propertySet != null && propertySet.contains(property);
+ return windowFunctions.contains(getAsterixFunctionInfo(winfi));
}
public static AbstractFunctionCallExpression makeWindowFunctionExpression(FunctionIdentifier winfi,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index bdfdac8..71b5239 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -327,6 +327,7 @@
* Use {@link #acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)}
* to visit only non-requiring expressions.
*/
+ @Override
public boolean requiresVariableReferenceExpressions() {
return false;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 2a8658d..1d8c47c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -58,6 +58,7 @@
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
@@ -77,12 +78,19 @@
private final boolean frameStartIsMonotonic;
+ private final boolean frameEndIsMonotonic;
+
+ private final boolean nestedTrivialAggregates;
+
public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
- List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) {
+ List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
+ boolean nestedTrivialAggregates) {
this.partitionColumns = partitionColumns;
this.partitionMaterialization = partitionMaterialization;
this.orderColumns = orderColumns;
this.frameStartIsMonotonic = frameStartIsMonotonic;
+ this.frameEndIsMonotonic = frameEndIsMonotonic;
+ this.nestedTrivialAggregates = nestedTrivialAggregates;
}
@Override
@@ -202,7 +210,7 @@
inputSchemas, context);
}
- AbstractWindowRuntimeFactory runtime;
+ AbstractWindowRuntimeFactory runtime = null;
if (winOp.hasNestedPlans()) {
int opSchemaSizePreSubplans = opSchema.getSize();
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
@@ -210,20 +218,35 @@
WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
- boolean useUnboundedRuntime = frameStartExprList.isEmpty() && frameEndExprList.isEmpty()
- && frameExcludeExprList.isEmpty() && frameOffsetExprEval == null;
- if (useUnboundedRuntime) {
- runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories, winOp.getFrameMaxObjects(),
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory);
- } else {
+ int frameMaxObjects = winOp.getFrameMaxObjects();
+
+ // special cases
+ if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) {
+ if (frameEndExprList.isEmpty()) {
+ // special case #1: frame == whole partition, no exclusions, no offset
+ runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
+ aggregatorOutputSchemaSize, nestedAggFactory);
+ } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+ // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
+ // trivial aggregate subplan ( aggregate + nts )
+ nestedAggFactory.setPartialOutputEnabled(true);
+ runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories,
+ frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
+ frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory);
+ }
+ }
+ // default case
+ if (runtime == null) {
runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, frameValueExprEvalsAndComparators.first,
frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic,
frameEndExprEvals, frameExcludeExprEvalsAndComparators.first,
winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
- frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
+ frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
aggregatorOutputSchemaSize, nestedAggFactory);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java
new file mode 100644
index 0000000..c909782
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+/**
+ * Aggregate operator runtime
+ */
+public class AggregatePushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ private final IAggregateEvaluatorFactory[] aggFactories;
+
+ private final IHyracksTaskContext ctx;
+
+ private IAggregateEvaluator[] aggEvals;
+
+ private IPointable result;
+
+ private ArrayTupleBuilder tupleBuilder;
+
+ private boolean first;
+
+ AggregatePushRuntime(IAggregateEvaluatorFactory[] aggFactories, IHyracksTaskContext ctx) {
+ this.aggFactories = aggFactories;
+ this.ctx = ctx;
+ aggEvals = new IAggregateEvaluator[aggFactories.length];
+ result = VoidPointable.FACTORY.createPointable();
+ tupleBuilder = new ArrayTupleBuilder(aggEvals.length);
+ first = true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppendRef(ctx);
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggEvals[i] = aggFactories[i].createAggregateEvaluator(ctx);
+ }
+ }
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggEvals[i].init();
+ }
+ super.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ processTuple(tRef);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (isOpen) {
+ try {
+ finishAggregates(false);
+ } finally {
+ super.close();
+ }
+ }
+ }
+
+ public void finishAggregates(boolean flushFrame) throws HyracksDataException {
+ tupleBuilder.reset();
+ for (IAggregateEvaluator aggEval : aggEvals) {
+ aggEval.finish(result);
+ tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder, flushFrame);
+ }
+
+ private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
+ for (IAggregateEvaluator aggEval : aggEvals) {
+ aggEval.step(tupleRef);
+ }
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 1f9cb91..7179c7a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -18,18 +18,10 @@
*/
package org.apache.hyracks.algebricks.runtime.operators.aggreg;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -59,66 +51,7 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws HyracksDataException {
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IAggregateEvaluator[] aggregs = new IAggregateEvaluator[aggregFactories.length];
- private IPointable result = VoidPointable.FACTORY.createPointable();
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
-
- private boolean first = true;
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- first = false;
- initAccessAppendRef(ctx);
- for (int i = 0; i < aggregFactories.length; i++) {
- aggregs[i] = aggregFactories[i].createAggregateEvaluator(ctx);
- }
- }
- for (int i = 0; i < aggregFactories.length; i++) {
- aggregs[i].init();
- }
- super.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- processTuple(tRef);
- }
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isOpen) {
- try {
- computeAggregate();
- appendToFrameFromTupleBuilder(tupleBuilder);
- } finally {
- super.close();
- }
- }
- }
-
- private void computeAggregate() throws HyracksDataException {
- tupleBuilder.reset();
- for (int f = 0; f < aggregs.length; f++) {
- aggregs[f].finish(result);
- tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
- }
- }
-
- private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
- for (int f = 0; f < aggregs.length; f++) {
- aggregs[f].step(tupleRef);
- }
- }
- };
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+ return new AggregatePushRuntime(aggregFactories, ctx);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 0ee7c1d..5fbea5a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -58,7 +58,7 @@
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
pipelines[i] =
- (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx);
+ (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx, null);
}
return new IAggregatorDescriptor() {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index d14f5c1..417ad9e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -68,8 +68,8 @@
IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
- pipelines[i] =
- (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, ctx);
+ pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter,
+ ctx, null);
}
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 376a7a1..f5477ec 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -96,7 +96,7 @@
//TODO: refactoring is needed
public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException {
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
@@ -104,9 +104,10 @@
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0];
- newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
start = enforce ? EnforceFrameWriter.enforce(start) : start;
+ IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
+ IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
+ IPushRuntime newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntimes[0]) : newRuntimes[0];
newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
@@ -114,6 +115,9 @@
// the nts has the same input and output rec. desc.
newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
}
+ if (outRuntimeMap != null) {
+ outRuntimeMap.put(runtimeFactory, newRuntimes[0]);
+ }
start = newRuntime;
}
return start;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 832cb22..059f946 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -61,7 +61,7 @@
@Override
public void flush() throws HyracksDataException {
- writer.flush();
+ appender.flush(writer);
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
new file mode 100644
index 0000000..9adeb4d
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ * Base class for window runtime implementations that compute nested aggregates
+ */
+abstract class AbstractWindowNestedPlansPushRuntime extends WindowMaterializingPushRuntime {
+
+ final int nestedAggOutSchemaSize;
+
+ private final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+ private IAggregatorDescriptor nestedAgg;
+
+ AbstractWindowNestedPlansPushRuntime(int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
+ runningAggOutColumns, runningAggFactories, ctx);
+ this.nestedAggFactory = nestedAggFactory;
+ this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+ }
+
+ @Override
+ protected void init() throws HyracksDataException {
+ super.init();
+ nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.close();
+ nestedAgg.close();
+ }
+
+ @Override
+ protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) {
+ return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize);
+ }
+
+ /**
+ * Aggregator created by
+ * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, RecordDescriptor, int[], int[], long)
+ * WindowAggregatorDescriptorFactory.createAggregator(...)}
+ * does not process argument tuple in init()
+ */
+ void nestedAggInit() throws HyracksDataException {
+ nestedAgg.init(null, null, -1, null);
+ }
+
+ void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
+ nestedAgg.aggregate(tAccess, tIndex, null, -1, null);
+ }
+
+ void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null);
+ }
+
+ void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null);
+ }
+
+ static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length];
+ for (int i = 0; i < evalFactories.length; i++) {
+ evals[i] = evalFactories[i].createScalarEvaluator(ctx);
+ }
+ return evals;
+ }
+
+ static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple, PointableTupleReference outTuple)
+ throws HyracksDataException {
+ for (int i = 0; i < evals.length; i++) {
+ evals[i].evaluate(inTuple, outTuple.getField(i));
+ }
+ }
+
+ static PointableTupleReference createPointables(int ln) {
+ IPointable[] pointables = new IPointable[ln];
+ for (int i = 0; i < ln; i++) {
+ pointables[i] = VoidPointable.FACTORY.createPointable();
+ }
+ return new PointableTupleReference(pointables);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
new file mode 100644
index 0000000..53857ac
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Base class for window runtime factories that compute nested aggregates
+ */
+abstract class AbstractWindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ final int nestedAggOutSchemaSize;
+
+ final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+ AbstractWindowNestedPlansRuntimeFactory(int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
+ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ this.nestedAggFactory = nestedAggFactory;
+ this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
index 6177723..4cf5ec5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
@@ -19,7 +19,13 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.aggreg.AggregatePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -40,12 +46,18 @@
private static final long serialVersionUID = 1L;
- private AlgebricksPipeline[] subplans;
+ private final AlgebricksPipeline[] subplans;
+
+ private boolean partialOutputEnabled;
public WindowAggregatorDescriptorFactory(AlgebricksPipeline[] subplans) {
this.subplans = subplans;
}
+ public void setPartialOutputEnabled(boolean value) {
+ partialOutputEnabled = value;
+ }
+
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget)
@@ -53,9 +65,26 @@
NestedPlansAccumulatingAggregatorFactory.AggregatorOutput outputWriter =
new NestedPlansAccumulatingAggregatorFactory.AggregatorOutput(subplans, 0);
NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+
+ Map<IPushRuntimeFactory, IPushRuntime> pipelineRuntimeMap = partialOutputEnabled ? new HashMap<>() : null;
+ AggregatePushRuntime[] aggs = partialOutputEnabled ? new AggregatePushRuntime[subplans.length] : null;
+
for (int i = 0; i < subplans.length; i++) {
- pipelines[i] =
- (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx);
+ AlgebricksPipeline subplan = subplans[i];
+ if (pipelineRuntimeMap != null) {
+ pipelineRuntimeMap.clear();
+ }
+ pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplan, outputWriter, ctx,
+ pipelineRuntimeMap);
+ if (pipelineRuntimeMap != null) {
+ IPushRuntimeFactory[] subplanFactories = subplan.getRuntimeFactories();
+ IPushRuntimeFactory aggFactory = subplanFactories[subplanFactories.length - 1];
+ AggregatePushRuntime agg = (AggregatePushRuntime) pipelineRuntimeMap.get(aggFactory);
+ if (agg == null) {
+ throw new IllegalStateException();
+ }
+ aggs[i] = agg;
+ }
}
return new IAggregatorDescriptor() {
@@ -64,7 +93,6 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
outputWriter.getTupleBuilder().reset();
-
for (NestedTupleSourceRuntime pipeline : pipelines) {
pipeline.open();
}
@@ -91,6 +119,30 @@
return true;
}
+ /**
+ * This method is called when evaluating accumulating frames.
+ * It emits current result of the aggregates but does not close pipelines, so aggregation can continue.
+ * This method may be called several times.
+ * {@link #outputFinalResult(ArrayTupleBuilder, IFrameTupleAccessor, int, AggregateState)}
+ * should be called at the end to emit the last value and close all pipelines
+ */
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ if (aggs == null) {
+ throw new UnsupportedOperationException();
+ }
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].flush();
+ aggs[i].finishAggregates(true);
+ }
+ memoryUsageCheck();
+ TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder);
+ outputWriter.getTupleBuilder().reset();
+ return true;
+ }
+
@Override
public AggregateState createAggregateStates() {
return null;
@@ -101,12 +153,6 @@
}
@Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void close() {
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index 565cbe6..cb4f534 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -35,21 +35,18 @@
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.DataUtils;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
import org.apache.hyracks.storage.common.MultiComparator;
/**
* Runtime for window operators that performs partition materialization and can evaluate running aggregates
* as well as regular aggregates (in nested plans) over window frames.
*/
-public class WindowNestedPlansPushRuntime extends WindowMaterializingPushRuntime {
+class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime {
private final boolean frameValueExists;
@@ -109,12 +106,6 @@
private final int frameMaxObjects;
- private final int nestedAggOutSchemaSize;
-
- private final WindowAggregatorDescriptorFactory nestedAggFactory;
-
- private IAggregatorDescriptor nestedAgg;
-
private IFrame copyFrame2;
private IFrame runFrame;
@@ -145,7 +136,7 @@
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, ctx);
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
this.frameStartEvalFactories = frameStartEvalFactories;
@@ -162,8 +153,6 @@
this.frameOffsetEvalFactory = frameOffsetEvalFactory;
this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
this.frameMaxObjects = frameMaxObjects;
- this.nestedAggFactory = nestedAggFactory;
- this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
@Override
@@ -195,8 +184,6 @@
bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
}
- nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1);
-
runFrame = new VSizeFrame(ctx);
copyFrame2 = new VSizeFrame(ctx);
tAccess2 = new FrameTupleAccessor(inputRecordDesc);
@@ -256,8 +243,7 @@
}
int toWrite = frameMaxObjects;
- // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init()
- nestedAgg.init(null, null, -1, null);
+ nestedAggInit();
int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
@@ -334,7 +320,7 @@
}
if (toWrite != 0) {
- nestedAgg.aggregate(tAccess2, tIdxInner, null, -1, null);
+ nestedAggAggregate(tAccess2, tIdxInner);
}
if (toWrite > 0) {
toWrite--;
@@ -345,10 +331,11 @@
}
}
- nestedAgg.outputFinalResult(tupleBuilder, null, -1, null);
+ nestedAggOutputFinalResult(tupleBuilder);
appendToFrameFromTupleBuilder(tupleBuilder);
if (frameStartIsMonotonic) {
+ frameStartForward = true;
if (chunkIdxFrameStartLocal >= 0) {
chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
@@ -381,33 +368,4 @@
}
return true;
}
-
- @Override
- protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) {
- return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize);
- }
-
- private static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx)
- throws HyracksDataException {
- IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length];
- for (int i = 0; i < evalFactories.length; i++) {
- evals[i] = evalFactories[i].createScalarEvaluator(ctx);
- }
- return evals;
- }
-
- private static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple,
- PointableTupleReference outTuple) throws HyracksDataException {
- for (int i = 0; i < evals.length; i++) {
- evals[i].evaluate(inTuple, outTuple.getField(i));
- }
- }
-
- private static PointableTupleReference createPointables(int ln) {
- IPointable[] pointables = new IPointable[ln];
- for (int i = 0; i < ln; i++) {
- pointables[i] = VoidPointable.FACTORY.createPointable();
- }
- return new PointableTupleReference(pointables);
- }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
new file mode 100644
index 0000000..e550d65
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+/**
+ * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over accumulating window frames
+ * (unbounded preceding to current row or N following).
+ */
+class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+
+ private final IScalarEvaluatorFactory[] frameValueEvalFactories;
+
+ private IScalarEvaluator[] frameValueEvals;
+
+ private PointableTupleReference frameValuePointables;
+
+ private final IBinaryComparatorFactory[] frameValueComparatorFactories;
+
+ private MultiComparator frameValueComparators;
+
+ private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+
+ private IScalarEvaluator[] frameEndEvals;
+
+ private PointableTupleReference frameEndPointables;
+
+ private final int frameMaxObjects;
+
+ private IFrame copyFrame2;
+
+ private IFrame runFrame;
+
+ private int runFrameChunkId;
+
+ private long runFrameSize;
+
+ private FrameTupleAccessor tAccess2;
+
+ private FrameTupleReference tRef2;
+
+ private int chunkIdxFrameEndGlobal;
+
+ private int tBeginIdxFrameEndGlobal;
+
+ private long readerPosFrameEndGlobal;
+
+ private int toWrite;
+
+ WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ this.frameValueEvalFactories = frameValueEvalFactories;
+ this.frameEndEvalFactories = frameEndEvalFactories;
+ this.frameValueComparatorFactories = frameValueComparatorFactories;
+ this.frameMaxObjects = frameMaxObjects;
+ }
+
+ @Override
+ protected void init() throws HyracksDataException {
+ super.init();
+
+ frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
+ frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
+ frameValuePointables = createPointables(frameValueEvalFactories.length);
+ frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
+ frameEndPointables = createPointables(frameEndEvalFactories.length);
+
+ runFrame = new VSizeFrame(ctx);
+ copyFrame2 = new VSizeFrame(ctx);
+ tAccess2 = new FrameTupleAccessor(inputRecordDesc);
+ tRef2 = new FrameTupleReference();
+ }
+
+ @Override
+ protected void beginPartitionImpl() throws HyracksDataException {
+ super.beginPartitionImpl();
+ nestedAggInit();
+ chunkIdxFrameEndGlobal = 0;
+ tBeginIdxFrameEndGlobal = -1;
+ readerPosFrameEndGlobal = 0;
+ runFrameChunkId = -1;
+ toWrite = frameMaxObjects;
+ }
+
+ @Override
+ protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
+ long readerPos = -1;
+ int nChunks = getPartitionChunkCount();
+ if (nChunks > 1) {
+ readerPos = reader.position();
+ if (chunkIdx == 0) {
+ ByteBuffer curFrameBuffer = curFrame.getBuffer();
+ int pos = curFrameBuffer.position();
+ copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
+ FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
+ curFrameBuffer.position(pos);
+ }
+ }
+
+ boolean isLastChunk = chunkIdx == nChunks - 1;
+
+ tAccess.reset(curFrame.getBuffer());
+ int tBeginIdx = getTupleBeginIdx(chunkIdx);
+ int tEndIdx = getTupleEndIdx(chunkIdx);
+ for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+ tRef.reset(tAccess, tIdx);
+
+ // running aggregates
+ produceTuple(tupleBuilder, tAccess, tIdx, tRef);
+
+ // frame boundaries
+ evaluate(frameEndEvals, tRef, frameEndPointables);
+
+ int chunkIdxInnerStart = chunkIdxFrameEndGlobal;
+ int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal;
+ if (nChunks > 1) {
+ reader.seek(readerPosFrameEndGlobal);
+ }
+
+ int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1;
+ long readerPosFrameEndLocal = -1;
+
+ frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
+ long readerPosFrameInner;
+ IFrame frameInner;
+ if (chunkIdxInner == 0) {
+ // first chunk's frame is always in memory
+ frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
+ readerPosFrameInner = 0;
+ } else {
+ readerPosFrameInner = reader.position();
+ if (runFrameChunkId == chunkIdxInner) {
+ // runFrame has this chunk, so just advance the reader
+ reader.seek(readerPosFrameInner + runFrameSize);
+ } else {
+ reader.nextFrame(runFrame);
+ runFrameSize = reader.position() - readerPosFrameInner;
+ runFrameChunkId = chunkIdxInner;
+ }
+ frameInner = runFrame;
+ }
+ tAccess2.reset(frameInner.getBuffer());
+
+ int tBeginIdxInner;
+ if (tBeginIdxInnerStart < 0) {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ } else {
+ tBeginIdxInner = tBeginIdxInnerStart;
+ tBeginIdxInnerStart = -1;
+ }
+ int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+ for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner && toWrite != 0; tIdxInner++) {
+ tRef2.reset(tAccess2, tIdxInner);
+
+ evaluate(frameValueEvals, tRef2, frameValuePointables);
+ if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
+ // save position of the tuple that matches the frame end.
+ // we'll continue from it in the next outer iteration
+ chunkIdxFrameEndLocal = chunkIdxInner;
+ tBeginIdxFrameEndLocal = tIdxInner;
+ readerPosFrameEndLocal = readerPosFrameInner;
+
+ // skip and exit if value > end
+ break frame_loop;
+ }
+
+ nestedAggAggregate(tAccess2, tIdxInner);
+
+ if (toWrite > 0) {
+ toWrite--;
+ }
+ }
+ }
+
+ boolean isLastTuple = isLastChunk && tIdx == tEndIdx;
+ if (isLastTuple) {
+ nestedAggOutputFinalResult(tupleBuilder);
+ } else {
+ nestedAggOutputPartialResult(tupleBuilder);
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+
+ if (chunkIdxFrameEndLocal >= 0) {
+ chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
+ tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
+ readerPosFrameEndGlobal = readerPosFrameEndLocal;
+ } else {
+ // could not find the end, set beyond the last chunk
+ chunkIdxFrameEndGlobal = nChunks;
+ tBeginIdxFrameEndGlobal = 0;
+ readerPosFrameEndGlobal = 0;
+ }
+ }
+
+ if (nChunks > 1) {
+ reader.seek(readerPos);
+ }
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
new file mode 100644
index 0000000..ddeaf2b
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over accumulating window frames
+ * (unbounded preceding to current row or N following).
+ */
+public class WindowNestedPlansRunningRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IScalarEvaluatorFactory[] frameValueEvalFactories;
+
+ private final IBinaryComparatorFactory[] frameValueComparatorFactories;
+
+ private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+
+ private final int frameMaxObjects;
+
+ public WindowNestedPlansRunningRuntimeFactory(int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory) {
+ super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
+ nestedAggFactory);
+ this.frameValueEvalFactories = frameValueEvalFactories;
+ this.frameValueComparatorFactories = frameValueComparatorFactories;
+ this.frameEndEvalFactories = frameEndEvalFactories;
+ this.frameMaxObjects = frameMaxObjects;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+ return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories,
+ orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories,
+ frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
+ nestedAggFactory, ctx);
+ }
+
+ @Override
+ public String toString() {
+ return "window [nested-running] (" + Arrays.toString(partitionColumns) + ") "
+ + Arrays.toString(runningAggOutColumns) + " := " + Arrays.toString(runningAggFactories);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index 16591d5..f754b91 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -32,20 +32,20 @@
* Runtime factory for window operators that performs partition materialization and can evaluate running aggregates
* as well as regular aggregates (in nested plans) over window frames.
*/
-public class WindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowNestedPlansRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory {
private static final long serialVersionUID = 1L;
private final IScalarEvaluatorFactory[] frameValueEvalFactories;
+ private final IBinaryComparatorFactory[] frameValueComparatorFactories;
+
private final IScalarEvaluatorFactory[] frameStartEvalFactories;
private final boolean frameStartIsMonotonic;
private final IScalarEvaluatorFactory[] frameEndEvalFactories;
- private final IBinaryComparatorFactory[] frameValueComparatorFactories;
-
private final IScalarEvaluatorFactory[] frameExcludeEvalFactories;
private final int frameExcludeNegationStartIdx;
@@ -58,10 +58,6 @@
private final int frameMaxObjects;
- private final int nestedAggOutSchemaSize;
-
- private final WindowAggregatorDescriptorFactory nestedAggFactory;
-
public WindowNestedPlansRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
@@ -74,20 +70,19 @@
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
WindowAggregatorDescriptorFactory nestedAggFactory) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
+ nestedAggFactory);
this.frameValueEvalFactories = frameValueEvalFactories;
+ this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameStartEvalFactories = frameStartEvalFactories;
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
- this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameExcludeEvalFactories = frameExcludeEvalFactories;
this.frameExcludeComparatorFactories = frameExcludeComparatorFactories;
this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx;
this.frameOffsetEvalFactory = frameOffsetEvalFactory;
this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
this.frameMaxObjects = frameMaxObjects;
- this.nestedAggFactory = nestedAggFactory;
- this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
index 4ceda1e..b25a36c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
/**
* Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
@@ -44,13 +43,7 @@
* <li>no frame offset</li>
* </ul>
*/
-public class WindowNestedPlansUnboundedPushRuntime extends WindowMaterializingPushRuntime {
-
- private final int nestedAggOutSchemaSize;
-
- private final WindowAggregatorDescriptorFactory nestedAggFactory;
-
- private IAggregatorDescriptor nestedAgg;
+class WindowNestedPlansUnboundedPushRuntime extends AbstractWindowNestedPlansPushRuntime {
private ArrayTupleBuilder nestedAggOutputBuilder;
@@ -64,24 +57,20 @@
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, ctx);
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
this.frameMaxObjects = frameMaxObjects;
- this.nestedAggFactory = nestedAggFactory;
- this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
@Override
protected void init() throws HyracksDataException {
super.init();
- nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1);
nestedAggOutputBuilder = new ArrayTupleBuilder(nestedAggOutSchemaSize);
}
@Override
protected void beginPartitionImpl() throws HyracksDataException {
super.beginPartitionImpl();
- // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init()
- nestedAgg.init(null, null, -1, null);
+ nestedAggInit();
nestedAggOutputBuilder.reset();
toWrite = frameMaxObjects;
}
@@ -92,7 +81,7 @@
super.partitionChunkImpl(frameId, frameBuffer, tBeginIdx, tEndIdx);
tAccess.reset(frameBuffer);
for (int t = tBeginIdx; t <= tEndIdx && toWrite != 0; t++) {
- nestedAgg.aggregate(tAccess, t, null, -1, null);
+ nestedAggAggregate(tAccess, t);
if (toWrite > 0) {
toWrite--;
}
@@ -101,7 +90,7 @@
@Override
protected void endPartitionImpl() throws HyracksDataException {
- nestedAgg.outputFinalResult(nestedAggOutputBuilder, null, -1, null);
+ nestedAggOutputFinalResult(nestedAggOutputBuilder);
super.endPartitionImpl();
}
@@ -111,9 +100,4 @@
super.produceTuple(tb, accessor, tIndex, tupleRef);
TupleUtils.addFields(nestedAggOutputBuilder, tb);
}
-
- @Override
- protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) {
- return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize);
- }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
index f89a8e5..0f7d9cf 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
@@ -30,16 +30,12 @@
* Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates
* as well as regular aggregates (in nested plans) over <b>unbounded</b> window frames.
*/
-public final class WindowNestedPlansUnboundedRuntimeFactory extends AbstractWindowRuntimeFactory {
+public final class WindowNestedPlansUnboundedRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory {
private static final long serialVersionUID = 1L;
private final int frameMaxObjects;
- private final int nestedAggOutSchemaSize;
-
- private final WindowAggregatorDescriptorFactory nestedAggFactory;
-
public WindowNestedPlansUnboundedRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects,
@@ -47,10 +43,9 @@
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
WindowAggregatorDescriptorFactory nestedAggFactory) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
+ nestedAggFactory);
this.frameMaxObjects = frameMaxObjects;
- this.nestedAggFactory = nestedAggFactory;
- this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
@Override