[NO ISSUE][COMP][RT] Handle type mismatch in window frame boundaries
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- If window frame boundary is defined as N preceding/following
and the incoming value is not of a numeric or temporal type
then assume that the frame is empty for this value because
its boundaries cannot be computed
- Add tests for NULL/MISSING/complex types for window operator
- Fix typo in the name resolution documentation
Change-Id: I4dc1b010674eb9a8b679039dc68c81163d156956
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3342
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 5d1d690..86b2b88 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -328,9 +328,15 @@
for (Mutable<ILogicalExpression> me : op.getFrameStartExpressions()) {
sweepExpression(me.getValue());
}
+ for (Mutable<ILogicalExpression> me : op.getFrameStartValidationExpressions()) {
+ sweepExpression(me.getValue());
+ }
for (Mutable<ILogicalExpression> me : op.getFrameEndExpressions()) {
sweepExpression(me.getValue());
}
+ for (Mutable<ILogicalExpression> me : op.getFrameEndValidationExpressions()) {
+ sweepExpression(me.getValue());
+ }
for (Mutable<ILogicalExpression> me : op.getFrameExcludeExpressions()) {
sweepExpression(me.getValue());
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 65d59c7..a6b9f59 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -96,6 +96,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -1073,7 +1074,9 @@
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprListOut = Collections.emptyList();
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprRefs = null;
List<Mutable<ILogicalExpression>> frameStartExprRefs = null;
+ List<Mutable<ILogicalExpression>> frameStartValidationExprRefs = null;
List<Mutable<ILogicalExpression>> frameEndExprRefs = null;
+ List<Mutable<ILogicalExpression>> frameEndValidationExprRefs = null;
List<Mutable<ILogicalExpression>> frameExcludeExprRefs = null;
int frameExcludeNotStartIdx = -1;
@@ -1230,20 +1233,24 @@
currentOpRef = new MutableObject<>(helperWinOp);
}
- Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameStartResult = translateWindowBoundary(
- winFrameStartKind, winFrameStartExpr, frameValueExprRefs, orderExprListOut, currentOpRef);
+ Triple<ILogicalOperator, List<Mutable<ILogicalExpression>>, List<Mutable<ILogicalExpression>>> frameStartResult =
+ translateWindowBoundary(winFrameStartKind, winFrameStartExpr, frameValueExprRefs, orderExprListOut,
+ currentOpRef);
if (frameStartResult != null) {
- frameStartExprRefs = frameStartResult.first;
- if (frameStartResult.second != null) {
- currentOpRef = new MutableObject<>(frameStartResult.second);
+ frameStartExprRefs = frameStartResult.second;
+ frameStartValidationExprRefs = frameStartResult.third;
+ if (frameStartResult.first != null) {
+ currentOpRef = new MutableObject<>(frameStartResult.first);
}
}
- Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> frameEndResult = translateWindowBoundary(
- winFrameEndKind, winFrameEndExpr, frameValueExprRefs, orderExprListOut, currentOpRef);
+ Triple<ILogicalOperator, List<Mutable<ILogicalExpression>>, List<Mutable<ILogicalExpression>>> frameEndResult =
+ translateWindowBoundary(winFrameEndKind, winFrameEndExpr, frameValueExprRefs, orderExprListOut,
+ currentOpRef);
if (frameEndResult != null) {
- frameEndExprRefs = frameEndResult.first;
- if (frameEndResult.second != null) {
- currentOpRef = new MutableObject<>(frameEndResult.second);
+ frameEndExprRefs = frameEndResult.second;
+ frameEndValidationExprRefs = frameEndResult.third;
+ if (frameEndResult.first != null) {
+ currentOpRef = new MutableObject<>(frameEndResult.first);
}
}
}
@@ -1257,8 +1264,8 @@
}
WindowOperator winOp = new WindowOperator(partExprListOut, orderExprListOut, frameValueExprRefs,
- frameStartExprRefs, frameEndExprRefs, frameExcludeExprRefs, frameExcludeNotStartIdx, frameOffsetExpr,
- winFrameMaxOjbects);
+ frameStartExprRefs, frameStartValidationExprRefs, frameEndExprRefs, frameEndValidationExprRefs,
+ frameExcludeExprRefs, frameExcludeNotStartIdx, frameOffsetExpr, winFrameMaxOjbects);
winOp.setSourceLocation(sourceLoc);
LogicalVariable runningAggResultVar = null, nestedAggResultVar = null;
@@ -1465,7 +1472,7 @@
return true;
}
- private Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> translateWindowBoundary(
+ private Triple<ILogicalOperator, List<Mutable<ILogicalExpression>>, List<Mutable<ILogicalExpression>>> translateWindowBoundary(
WindowExpression.FrameBoundaryKind boundaryKind, Expression boundaryExpr,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> valueExprs,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList,
@@ -1481,15 +1488,17 @@
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : valueExprs) {
resultExprs.add(new MutableObject<>(p.second.getValue().cloneExpression()));
}
- return new Pair<>(resultExprs, null);
+ return new Triple<>(null, resultExprs, null);
case BOUNDED_PRECEDING:
OperatorType opTypePreceding = valueExprs.get(0).first.getKind() == OrderOperator.IOrder.OrderKind.ASC
? OperatorType.MINUS : OperatorType.PLUS;
- return translateWindowBoundaryExpr(boundaryExpr, valueExprs, tupSource, opTypePreceding);
+ return translateWindowBoundaryExpr(boundaryExpr, valueExprs, tupSource, opTypePreceding,
+ BuiltinFunctions.IS_NUMERIC_ADD_COMPATIBLE);
case BOUNDED_FOLLOWING:
OperatorType opTypeFollowing = valueExprs.get(0).first.getKind() == OrderOperator.IOrder.OrderKind.ASC
? OperatorType.PLUS : OperatorType.MINUS;
- return translateWindowBoundaryExpr(boundaryExpr, valueExprs, tupSource, opTypeFollowing);
+ return translateWindowBoundaryExpr(boundaryExpr, valueExprs, tupSource, opTypeFollowing,
+ BuiltinFunctions.IS_NUMERIC_ADD_COMPATIBLE);
case UNBOUNDED_PRECEDING:
case UNBOUNDED_FOLLOWING:
return null;
@@ -1499,14 +1508,19 @@
}
}
- private Pair<List<Mutable<ILogicalExpression>>, ILogicalOperator> translateWindowBoundaryExpr(
+ private Triple<ILogicalOperator, List<Mutable<ILogicalExpression>>, List<Mutable<ILogicalExpression>>> translateWindowBoundaryExpr(
Expression boundaryExpr, List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> valueExprs,
- Mutable<ILogicalOperator> tupSource, OperatorType boundaryOperator) throws CompilationException {
- SourceLocation sourceLoc = boundaryExpr.getSourceLocation();
+ Mutable<ILogicalOperator> tupSource, OperatorType boundaryOperator, FunctionIdentifier validationFunction)
+ throws CompilationException {
if (valueExprs.size() != 1) {
- throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, valueExprs.size());
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, boundaryExpr.getSourceLocation(),
+ valueExprs.size());
}
ILogicalExpression valueExpr = valueExprs.get(0).second.getValue();
+ SourceLocation sourceLoc = valueExpr.getSourceLocation();
+
+ AbstractFunctionCallExpression validationExpr = createFunctionCallExpression(validationFunction, sourceLoc);
+ validationExpr.getArguments().add(new MutableObject<>(valueExpr.cloneExpression()));
AbstractFunctionCallExpression resultExpr =
createFunctionCallExpressionForBuiltinOperator(boundaryOperator, sourceLoc);
@@ -1522,7 +1536,8 @@
VariableReferenceExpression resultVarRefExpr = new VariableReferenceExpression(resultVar);
resultVarRefExpr.setSourceLocation(sourceLoc);
- return new Pair<>(mkSingletonArrayList(new MutableObject<>(resultVarRefExpr)), assignOp);
+ return new Triple<>(assignOp, mkSingletonArrayList(new MutableObject<>(resultVarRefExpr)),
+ mkSingletonArrayList(new MutableObject<>(validationExpr)));
}
private Pair<List<Mutable<ILogicalExpression>>, Integer> translateWindowExclusion(
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.1.query.sqlpp
new file mode 100644
index 0000000..7a3cd3f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.1.query.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : PARTITION BY NULL/MISSING/complex
+ * Expected Res : SUCCESS
+ */
+
+from [
+ { "y": 10 },
+ { "y": 20 },
+ { "x": null, "y": 10 },
+ { "x": null, "y": 20 },
+ { "x": 1, "y": 10 },
+ { "x": 1, "y": 20 },
+ { "x": "a", "y": 10 },
+ { "x": "a", "y": 20 },
+ { "x": [ "b" ], "y": 10 },
+ { "x": [ "b" ], "y": 20 },
+ { "x": { "c": 1 }, "y": 10 },
+ { "x": { "c": 1 }, "y": 20 }
+] t
+select x, y, sum(y) over (partition by x) w
+order by x, y
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.2.query.sqlpp
new file mode 100644
index 0000000..3853d2e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.2.query.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : ORDER BY MISSING/NULL/complex
+ * Expected Res : SUCCESS
+ */
+
+from [
+ { "y": "m" },
+ { "x": null, "y": "n" },
+ { "x": 1, "y": "i" },
+ { "x": "a", "y": "s" },
+ { "x": [ "b" ], "y": "a" },
+ { "x": { "c": 1 }, "y": "o" }
+] t
+select
+ nth_value(y, 0) over (order by x rows between unbounded preceding and unbounded following) w0,
+ nth_value(y, 1) over (order by x rows between unbounded preceding and unbounded following) w1,
+ nth_value(y, 2) over (order by x rows between unbounded preceding and unbounded following) w2,
+ nth_value(y, 3) over (order by x rows between unbounded preceding and unbounded following) w3,
+ nth_value(y, 4) over (order by x rows between unbounded preceding and unbounded following) w4,
+ nth_value(y, 5) over (order by x rows between unbounded preceding and unbounded following) w5,
+ nth_value(y, 6) over (order by x rows between unbounded preceding and unbounded following) w6,
+ x, y
+order by x, y
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.3.query.sqlpp
new file mode 100644
index 0000000..869c827
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_null_missing/win_null_missing.3.query.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : RANGE when value is NULL/MISSING/complex
+ * Expected Res : SUCCESS
+ */
+
+from [
+ { "y": 1 },
+ { "y": 2 },
+ { "x": null, "y": 3 },
+ { "x": null, "y": 4 },
+ { "x": 1, "y": 5 },
+ { "x": 1, "y": 6 },
+ { "x": "a", "y": 7 },
+ { "x": "a", "y": 8 },
+ { "x": [ "b" ], "y": 9 },
+ { "x": [ "b" ], "y": 10 },
+ { "x": { "c": 1 }, "y": 11 },
+ { "x": { "c": 1 }, "y": 12 }
+] t
+select
+ count(y) over (order by x range between unbounded preceding and current row) w1,
+ count(y) over (order by x range between 0 preceding and current row) w2,
+ count(y) over (order by x range between current row and current row) w3,
+
+ count(y) over (order by x range between unbounded preceding and 0 following) w4,
+ count(y) over (order by x range between 0 preceding and 0 following) w5,
+ count(y) over (order by x range between current row and 0 following) w6,
+
+ count(y) over (order by x range between unbounded preceding and unbounded following) w7,
+ count(y) over (order by x range between 0 preceding and unbounded following) w8,
+ count(y) over (order by x range between current row and unbounded following) w9,
+
+ x, y
+order by x, y
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.1.adm
new file mode 100644
index 0000000..9fc3986
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.1.adm
@@ -0,0 +1,12 @@
+{ "y": 10, "w": 30 }
+{ "y": 20, "w": 30 }
+{ "x": null, "y": 10, "w": 30 }
+{ "x": null, "y": 20, "w": 30 }
+{ "x": 1, "y": 10, "w": 30 }
+{ "x": 1, "y": 20, "w": 30 }
+{ "x": "a", "y": 10, "w": 30 }
+{ "x": "a", "y": 20, "w": 30 }
+{ "x": [ "b" ], "y": 10, "w": 30 }
+{ "x": [ "b" ], "y": 20, "w": 30 }
+{ "x": { "c": 1 }, "y": 10, "w": 30 }
+{ "x": { "c": 1 }, "y": 20, "w": 30 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.2.adm
new file mode 100644
index 0000000..456c327
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.2.adm
@@ -0,0 +1,6 @@
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "y": "m" }
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "x": null, "y": "n" }
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "x": 1, "y": "i" }
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "x": "a", "y": "s" }
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "x": [ "b" ], "y": "a" }
+{ "w0": "m", "w1": "m", "w2": "n", "w3": "i", "w4": "s", "w5": "a", "w6": "o", "x": { "c": 1 }, "y": "o" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.3.adm
new file mode 100644
index 0000000..1f09627
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_null_missing/win_null_missing.3.adm
@@ -0,0 +1,12 @@
+{ "w1": 2, "w2": 2, "w3": 2, "w4": 2, "w5": 2, "w6": 2, "w7": 12, "w8": 12, "w9": 12, "y": 1 }
+{ "w1": 2, "w2": 2, "w3": 2, "w4": 2, "w5": 2, "w6": 2, "w7": 12, "w8": 12, "w9": 12, "y": 2 }
+{ "w1": 4, "w2": 2, "w3": 2, "w4": 4, "w5": 2, "w6": 2, "w7": 12, "w8": 10, "w9": 10, "x": null, "y": 3 }
+{ "w1": 4, "w2": 2, "w3": 2, "w4": 4, "w5": 2, "w6": 2, "w7": 12, "w8": 10, "w9": 10, "x": null, "y": 4 }
+{ "w1": 6, "w2": 2, "w3": 2, "w4": 6, "w5": 2, "w6": 2, "w7": 12, "w8": 8, "w9": 8, "x": 1, "y": 5 }
+{ "w1": 6, "w2": 2, "w3": 2, "w4": 6, "w5": 2, "w6": 2, "w7": 12, "w8": 8, "w9": 8, "x": 1, "y": 6 }
+{ "w1": 8, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 6, "x": "a", "y": 7 }
+{ "w1": 8, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 6, "x": "a", "y": 8 }
+{ "w1": 10, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 4, "x": [ "b" ], "y": 9 }
+{ "w1": 10, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 4, "x": [ "b" ], "y": 10 }
+{ "w1": 12, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 2, "x": { "c": 1 }, "y": 11 }
+{ "w1": 12, "w2": 0, "w3": 2, "w4": 0, "w5": 0, "w6": 0, "w7": 12, "w8": 0, "w9": 2, "x": { "c": 1 }, "y": 12 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 74bb8a8..edfa544 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -10115,6 +10115,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="window">
+ <compilation-unit name="win_null_missing">
+ <output-dir compare="Text">win_null_missing</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="window">
<compilation-unit name="win_opt_01">
<output-dir compare="Text">win_opt_01</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/appendix_3_resolution.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/appendix_3_resolution.md
index 6bde7ce..1f0c62b 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/appendix_3_resolution.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/appendix_3_resolution.md
@@ -258,7 +258,7 @@
For example, if the whole query is `ARRAY_COUNT(a.b)` then `a.b` will be treated as dataset `b` contained in
dataverse `a`.
Note that this rule only applies to identifiers which are located directly inside a standalone expression.
- Identifiers inside SELECT statements in a standalone expresion are still resolved according to Rules 1-3.
+ Identifiers inside SELECT statements in a standalone expression are still resolved according to Rules 1-3.
For example, if the whole query is `ARRAY_SUM( (FROM employee AS e SELECT VALUE salary) )` then `salary` is resolved
as `e.salary` following the "Single Variable Rule" (Rule 2.2).
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index e2ba1e1..df2a868 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -1529,6 +1529,8 @@
public static final FunctionIdentifier TREAT_AS_INTEGER =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "treat-as-integer", 1);
+ public static final FunctionIdentifier IS_NUMERIC_ADD_COMPATIBLE =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "is-numeric-add-compatibe", 1);
public static final FunctionIdentifier EXTERNAL_LOOKUP =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "external-lookup", FunctionIdentifier.VARARGS);
@@ -1730,6 +1732,7 @@
addFunction(TO_STRING, AStringTypeComputer.INSTANCE, true);
addPrivateFunction(TREAT_AS_INTEGER, TreatAsTypeComputer.INSTANCE_INTEGER, true);
+ addPrivateFunction(IS_NUMERIC_ADD_COMPATIBLE, BooleanOnlyTypeComputer.INSTANCE, true);
addFunction(IF_INF, IfNanOrInfTypeComputer.INSTANCE, true);
addFunction(IF_MISSING, IfMissingTypeComputer.INSTANCE, true);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/IsNumericAddCompatibleDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/IsNumericAddCompatibleDescriptor.java
new file mode 100644
index 0000000..831ec09
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/IsNumericAddCompatibleDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.evaluators.common.AbstractTypeCheckEvaluator;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Returns {@code TRUE} if the argument type is one of the types that are allowed on the left side of
+ * {@link BuiltinFunctions#NUMERIC_ADD numeric-add()}:
+ * <ul>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AMISSING MISSING}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ANULL NULL}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AINT8 TINYINT}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AINT16 SMALLINT}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AINT32 INTEGER}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AINT64 BIGINT}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AFLOAT FLOAT}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ADOUBLE DOUBLE}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ADATE DATE}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ADATETIME DATETIME}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ATIME TIME}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ADURATION DURATION}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#AYEARMONTHDURATION YEARMONTHDURATION}</li>
+ * <li>{@link org.apache.asterix.om.types.BuiltinType#ADAYTIMEDURATION DAYTIMEDURATION}</li>
+ * </ul>
+ *
+ * Returns {@code FALSE} for all other types
+ *
+ * @see NumericAddDescriptor
+ * @see AbstractNumericArithmeticEval
+ */
+public class IsNumericAddCompatibleDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new IsNumericAddCompatibleDescriptor();
+ }
+ };
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new AbstractTypeCheckEvaluator(args[0].createScalarEvaluator(ctx)) {
+ @Override
+ protected Value isMatch(byte typeTag) {
+ ATypeTag tt = ATypeTag.VALUE_TYPE_MAPPING[typeTag];
+ switch (tt) {
+ case MISSING:
+ case NULL:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case DATETIME:
+ case TIME:
+ case DURATION:
+ case YEARMONTHDURATION:
+ case DAYTIMEDURATION:
+ return Value.TRUE;
+ default:
+ return Value.FALSE;
+ }
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.IS_NUMERIC_ADD_COMPATIBLE;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 488ee76..15ece9c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -339,6 +339,7 @@
import org.apache.asterix.runtime.evaluators.functions.IsMissingDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsNullDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsNumberDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.IsNumericAddCompatibleDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsObjectDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsStringDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsSystemNullDescriptor;
@@ -837,6 +838,8 @@
fc.add(CurrentTimeDescriptor.FACTORY);
fc.add(CurrentDateTimeDescriptor.FACTORY);
+ fc.add(IsNumericAddCompatibleDescriptor.FACTORY);
+
// functions that need generated class for null-handling.
// Element accessors.
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 71b5239..aa1ef0a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -47,7 +47,9 @@
* <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion.
* Each must be a variable reference</li>
* <li>{@link #frameStartExpressions} - frame start boundary</li>
+ * <li>{@link #frameStartValidationExpressions} - frame start boundary validators</li>
* <li>{@link #frameEndExpressions} - frame end boundary</li>
+ * <li>{@link #frameEndValidationExpressions} - frame end boundary validators</li>
* <li>{@link #frameExcludeExpressions} - define values to be excluded from the frame</li>
* <li>{@link #frameOffset} - sets how many tuples to skip inside each frame</li>
* <li>{@link #frameMaxObjects} - limits number of tuples to be returned for each frame</li>
@@ -67,8 +69,12 @@
private final List<Mutable<ILogicalExpression>> frameStartExpressions;
+ private final List<Mutable<ILogicalExpression>> frameStartValidationExpressions;
+
private final List<Mutable<ILogicalExpression>> frameEndExpressions;
+ private final List<Mutable<ILogicalExpression>> frameEndValidationExpressions;
+
private final List<Mutable<ILogicalExpression>> frameExcludeExpressions;
private int frameExcludeNegationStartIdx;
@@ -83,14 +89,16 @@
public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions) {
- this(partitionExpressions, orderExpressions, null, null, null, null, -1, null, -1);
+ this(partitionExpressions, orderExpressions, null, null, null, null, null, null, -1, null, -1);
}
public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExpressions,
List<Mutable<ILogicalExpression>> frameStartExpressions,
+ List<Mutable<ILogicalExpression>> frameStartValidationExpressions,
List<Mutable<ILogicalExpression>> frameEndExpressions,
+ List<Mutable<ILogicalExpression>> frameEndValidationExpressions,
List<Mutable<ILogicalExpression>> frameExcludeExpressions, int frameExcludeNegationStartIdx,
ILogicalExpression frameOffset, int frameMaxObjects) {
this.partitionExpressions = new ArrayList<>();
@@ -109,10 +117,18 @@
if (frameStartExpressions != null) {
this.frameStartExpressions.addAll(frameStartExpressions);
}
+ this.frameStartValidationExpressions = new ArrayList<>();
+ if (frameStartValidationExpressions != null) {
+ this.frameStartValidationExpressions.addAll(frameStartValidationExpressions);
+ }
this.frameEndExpressions = new ArrayList<>();
if (frameEndExpressions != null) {
this.frameEndExpressions.addAll(frameEndExpressions);
}
+ this.frameEndValidationExpressions = new ArrayList<>();
+ if (frameEndValidationExpressions != null) {
+ this.frameEndValidationExpressions.addAll(frameEndValidationExpressions);
+ }
this.frameExcludeExpressions = new ArrayList<>();
if (frameExcludeExpressions != null) {
this.frameExcludeExpressions.addAll(frameExcludeExpressions);
@@ -128,11 +144,14 @@
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExpressions,
List<Mutable<ILogicalExpression>> frameStartExpressions,
+ List<Mutable<ILogicalExpression>> frameStartValidationExpressions,
List<Mutable<ILogicalExpression>> frameEndExpressions,
+ List<Mutable<ILogicalExpression>> frameEndValidationExpressions,
List<Mutable<ILogicalExpression>> frameExcludeExpressions, int frameExcludeNegationStartIdx,
ILogicalExpression frameOffset, int frameMaxObjects, List<LogicalVariable> variables,
List<Mutable<ILogicalExpression>> expressions, List<ILogicalPlan> nestedPlans) {
- this(partitionExpressions, orderExpressions, frameValueExpressions, frameStartExpressions, frameEndExpressions,
+ this(partitionExpressions, orderExpressions, frameValueExpressions, frameStartExpressions,
+ frameStartValidationExpressions, frameEndExpressions, frameEndValidationExpressions,
frameExcludeExpressions, frameExcludeNegationStartIdx, frameOffset, frameMaxObjects);
if (variables != null) {
this.variables.addAll(variables);
@@ -166,10 +185,18 @@
return frameStartExpressions;
}
+ public List<Mutable<ILogicalExpression>> getFrameStartValidationExpressions() {
+ return frameStartValidationExpressions;
+ }
+
public List<Mutable<ILogicalExpression>> getFrameEndExpressions() {
return frameEndExpressions;
}
+ public List<Mutable<ILogicalExpression>> getFrameEndValidationExpressions() {
+ return frameEndValidationExpressions;
+ }
+
public List<Mutable<ILogicalExpression>> getFrameExcludeExpressions() {
return frameExcludeExpressions;
}
@@ -245,9 +272,15 @@
for (Mutable<ILogicalExpression> expr : frameStartExpressions) {
mod |= visitor.transform(expr);
}
+ for (Mutable<ILogicalExpression> expr : frameStartValidationExpressions) {
+ mod |= visitor.transform(expr);
+ }
for (Mutable<ILogicalExpression> expr : frameEndExpressions) {
mod |= visitor.transform(expr);
}
+ for (Mutable<ILogicalExpression> expr : frameEndValidationExpressions) {
+ mod |= visitor.transform(expr);
+ }
for (Mutable<ILogicalExpression> excludeExpr : frameExcludeExpressions) {
mod |= visitor.transform(excludeExpr);
}
@@ -307,9 +340,15 @@
for (Mutable<ILogicalExpression> expr : frameStartExpressions) {
expr.getValue().getUsedVariables(vars);
}
+ for (Mutable<ILogicalExpression> expr : frameStartValidationExpressions) {
+ expr.getValue().getUsedVariables(vars);
+ }
for (Mutable<ILogicalExpression> expr : frameEndExpressions) {
expr.getValue().getUsedVariables(vars);
}
+ for (Mutable<ILogicalExpression> expr : frameEndValidationExpressions) {
+ expr.getValue().getUsedVariables(vars);
+ }
for (Mutable<ILogicalExpression> excludeExpr : frameExcludeExpressions) {
excludeExpr.getValue().getUsedVariables(vars);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c33c647..5e5f18f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -651,7 +651,11 @@
public static boolean compareWindowFrameSpec(WindowOperator winOp1, WindowOperator winOp2) {
return compareIOrderAndExpressions(winOp1.getFrameValueExpressions(), winOp2.getFrameValueExpressions())
&& compareExpressions(winOp1.getFrameStartExpressions(), winOp2.getFrameStartExpressions())
+ && compareExpressions(winOp1.getFrameStartValidationExpressions(),
+ winOp2.getFrameStartValidationExpressions())
&& compareExpressions(winOp1.getFrameEndExpressions(), winOp2.getFrameEndExpressions())
+ && compareExpressions(winOp1.getFrameEndValidationExpressions(),
+ winOp2.getFrameEndValidationExpressions())
&& compareExpressions(winOp1.getFrameExcludeExpressions(), winOp2.getFrameExcludeExpressions())
&& winOp1.getFrameExcludeNegationStartIdx() == winOp2.getFrameExcludeNegationStartIdx()
&& Objects.equals(winOp1.getFrameOffset().getValue(), winOp2.getFrameOffset().getValue())
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 8ab23c2..99e852d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -622,8 +622,12 @@
deepCopyOrderExpressionReferencePairList(op.getFrameValueExpressions());
List<Mutable<ILogicalExpression>> frameStartExprCopy =
exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameStartExpressions());
+ List<Mutable<ILogicalExpression>> frameStartValidationExprCopy =
+ exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameStartValidationExpressions());
List<Mutable<ILogicalExpression>> frameEndExprCopy =
exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameEndExpressions());
+ List<Mutable<ILogicalExpression>> frameEndValidationExprCopy =
+ exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameEndValidationExpressions());
List<Mutable<ILogicalExpression>> frameExclusionExprCopy =
exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameExcludeExpressions());
ILogicalExpression frameOffsetCopy = exprDeepCopyVisitor.deepCopy(op.getFrameOffset().getValue());
@@ -632,8 +636,9 @@
exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions());
List<ILogicalPlan> nestedPlansCopy = new ArrayList<>();
WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, frameValueExprCopy,
- frameStartExprCopy, frameEndExprCopy, frameExclusionExprCopy, op.getFrameExcludeNegationStartIdx(),
- frameOffsetCopy, op.getFrameMaxObjects(), varCopy, exprCopy, nestedPlansCopy);
+ frameStartExprCopy, frameStartValidationExprCopy, frameEndExprCopy, frameEndValidationExprCopy,
+ frameExclusionExprCopy, op.getFrameExcludeNegationStartIdx(), frameOffsetCopy, op.getFrameMaxObjects(),
+ varCopy, exprCopy, nestedPlansCopy);
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
deepCopyPlanList(op.getNestedPlans(), nestedPlansCopy, opCopy);
return opCopy;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index cc0879e..5be91cc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -418,8 +418,12 @@
deepCopyOrderAndExpression(op.getFrameValueExpressions());
List<Mutable<ILogicalExpression>> newFrameStartExprs = new ArrayList<>();
deepCopyExpressionRefs(newFrameStartExprs, op.getFrameStartExpressions());
+ List<Mutable<ILogicalExpression>> newFrameStartValidationExprs = new ArrayList<>();
+ deepCopyExpressionRefs(newFrameStartValidationExprs, op.getFrameStartValidationExpressions());
List<Mutable<ILogicalExpression>> newFrameEndExprs = new ArrayList<>();
deepCopyExpressionRefs(newFrameEndExprs, op.getFrameEndExpressions());
+ List<Mutable<ILogicalExpression>> newFrameEndValidationExprs = new ArrayList<>();
+ deepCopyExpressionRefs(newFrameEndValidationExprs, op.getFrameEndValidationExpressions());
List<Mutable<ILogicalExpression>> newFrameExclusionExprs = new ArrayList<>();
deepCopyExpressionRefs(newFrameExclusionExprs, op.getFrameExcludeExpressions());
ILogicalExpression newFrameOffset = deepCopyExpressionRef(op.getFrameOffset()).getValue();
@@ -429,8 +433,9 @@
deepCopyExpressionRefs(newExpressions, op.getExpressions());
List<ILogicalPlan> newNestedPlans = new ArrayList<>();
WindowOperator newWinOp = new WindowOperator(newPartitionExprs, newOrderExprs, newFrameValueExprs,
- newFrameStartExprs, newFrameEndExprs, newFrameExclusionExprs, op.getFrameExcludeNegationStartIdx(),
- newFrameOffset, op.getFrameMaxObjects(), newVariables, newExpressions, newNestedPlans);
+ newFrameStartExprs, newFrameStartValidationExprs, newFrameEndExprs, newFrameEndValidationExprs,
+ newFrameExclusionExprs, op.getFrameExcludeNegationStartIdx(), newFrameOffset, op.getFrameMaxObjects(),
+ newVariables, newExpressions, newNestedPlans);
for (ILogicalPlan nestedPlan : op.getNestedPlans()) {
newNestedPlans.add(OperatorManipulationUtil.deepCopy(nestedPlan, newWinOp));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index e9f82ef..550a208 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -517,9 +517,15 @@
for (Mutable<ILogicalExpression> expr : op.getFrameStartExpressions()) {
expr.getValue().substituteVar(pair.first, pair.second);
}
+ for (Mutable<ILogicalExpression> expr : op.getFrameStartValidationExpressions()) {
+ expr.getValue().substituteVar(pair.first, pair.second);
+ }
for (Mutable<ILogicalExpression> expr : op.getFrameEndExpressions()) {
expr.getValue().substituteVar(pair.first, pair.second);
}
+ for (Mutable<ILogicalExpression> expr : op.getFrameEndValidationExpressions()) {
+ expr.getValue().substituteVar(pair.first, pair.second);
+ }
for (Mutable<ILogicalExpression> expr : op.getFrameExcludeExpressions()) {
expr.getValue().substituteVar(pair.first, pair.second);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index ae6ab07..39b9689 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -480,9 +480,15 @@
for (Mutable<ILogicalExpression> exprRef : op.getFrameStartExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
+ for (Mutable<ILogicalExpression> exprRef : op.getFrameStartValidationExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
for (Mutable<ILogicalExpression> exprRef : op.getFrameEndExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
+ for (Mutable<ILogicalExpression> exprRef : op.getFrameEndValidationExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
for (Mutable<ILogicalExpression> exprRef : op.getFrameExcludeExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 269a809..d13c51a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -118,21 +118,14 @@
@Override
public String toString() {
- if (orderProp == null) {
- if (topK != -1) {
- // A topK value is introduced.
- return getOperatorTag().toString() + " [topK: " + topK + "]";
- } else {
- return getOperatorTag().toString();
- }
- } else {
- if (topK != -1) {
- // A topK value is introduced.
- return getOperatorTag().toString() + " [topK: " + topK + "]" + " " + orderProp;
- } else {
- return getOperatorTag().toString() + " " + orderProp;
- }
+ StringBuilder out = new StringBuilder();
+ out.append(getOperatorTag());
+ if (topK != -1) {
+ out.append(" [topK: ").append(topK).append(']');
}
+ if (orderProp != null) {
+ out.append(' ').append(orderProp);
+ }
+ return out.toString();
}
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index f5c66df..8bd4610 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -178,10 +178,18 @@
IScalarEvaluatorFactory[] frameStartExprEvals =
createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+ List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
IScalarEvaluatorFactory[] frameEndExprEvals =
createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+ List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
winOp.getFrameValueExpressions();
Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
@@ -239,7 +247,8 @@
runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
partitionComparatorFactories, orderComparatorFactories,
frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
- frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans,
+ frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects,
+ context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
memSizeInFrames);
}
@@ -248,10 +257,11 @@
if (runtime == null) {
runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic,
- frameEndExprEvals, frameExcludeExprEvalsAndComparators.first,
- winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
- frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects,
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
+ frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
+ frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
+ frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects,
+ context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(),
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 62b935b..27d4ced 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.prettyprint;
+import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
@@ -497,10 +498,16 @@
if (op.hasNestedPlans()) {
buffer.append(" frame on ");
pprintOrderList(op.getFrameValueExpressions(), indent);
- buffer.append("start ");
+ buffer.append(" start ");
List<Mutable<ILogicalExpression>> frameStartExpressions = op.getFrameStartExpressions();
if (!frameStartExpressions.isEmpty()) {
pprintExprList(frameStartExpressions, indent);
+ List<Mutable<ILogicalExpression>> frameStartValidationExpressions =
+ op.getFrameStartValidationExpressions();
+ if (!frameStartValidationExpressions.isEmpty()) {
+ buffer.append(" if ");
+ pprintExprList(frameStartValidationExpressions, indent);
+ }
} else {
buffer.append("unbounded");
}
@@ -508,6 +515,11 @@
List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions();
if (!frameEndExpressions.isEmpty()) {
pprintExprList(frameEndExpressions, indent);
+ List<Mutable<ILogicalExpression>> frameEndValidationExpressions = op.getFrameEndValidationExpressions();
+ if (!frameEndValidationExpressions.isEmpty()) {
+ buffer.append(" if ");
+ pprintExprList(frameEndValidationExpressions, indent);
+ }
} else {
buffer.append("unbounded");
}
@@ -594,11 +606,15 @@
buffer.append("]");
}
- protected void pprintOrderList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderList,
+ private void pprintOrderList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderList,
Integer indent) throws AlgebricksException {
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderList) {
+ for (Iterator<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> i = orderList.iterator(); i.hasNext();) {
+ Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p = i.next();
String fst = getOrderString(p.first);
- buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+ buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ")");
+ if (i.hasNext()) {
+ buffer.append(' ');
+ }
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 9e44774..f5ff12f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -695,6 +695,14 @@
buffer.append("\n");
addIndent(indent).append("}");
}
+ List<Mutable<ILogicalExpression>> frameStartValidationExpressions = op.getFrameStartValidationExpressions();
+ if (!frameStartValidationExpressions.isEmpty()) {
+ buffer.append(",\n");
+ addIndent(indent).append("\"frame-start-if\": {\n");
+ pprintExprList(frameStartValidationExpressions, fldIndent);
+ buffer.append("\n");
+ addIndent(indent).append("}");
+ }
List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions();
if (!frameEndExpressions.isEmpty()) {
buffer.append(",\n");
@@ -703,6 +711,14 @@
buffer.append("\n");
addIndent(indent).append("}");
}
+ List<Mutable<ILogicalExpression>> frameEndValidationExpressions = op.getFrameEndValidationExpressions();
+ if (!frameEndValidationExpressions.isEmpty()) {
+ buffer.append(",\n");
+ addIndent(indent).append("\"frame-end-if\": {\n");
+ pprintExprList(frameEndValidationExpressions, fldIndent);
+ buffer.append("\n");
+ addIndent(indent).append("}");
+ }
List<Mutable<ILogicalExpression>> frameExcludeExpressions = op.getFrameExcludeExpressions();
if (!frameExcludeExpressions.isEmpty()) {
buffer.append(",\n");
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index f50ca3a..1c761fa 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -614,11 +614,21 @@
stringBuilder.append(") frame start (");
printExprList(frameStartExpressions);
}
+ List<Mutable<ILogicalExpression>> frameStartValidationExpressions = op.getFrameStartValidationExpressions();
+ if (!frameStartValidationExpressions.isEmpty()) {
+ stringBuilder.append(") if (");
+ printExprList(frameStartValidationExpressions);
+ }
List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions();
if (!frameEndExpressions.isEmpty()) {
stringBuilder.append(") frame end (");
printExprList(frameEndExpressions);
}
+ List<Mutable<ILogicalExpression>> frameEndValidationExpressions = op.getFrameEndValidationExpressions();
+ if (!frameEndValidationExpressions.isEmpty()) {
+ stringBuilder.append(") if (");
+ printExprList(frameEndValidationExpressions);
+ }
List<Mutable<ILogicalExpression>> frameExcludeExpressions = op.getFrameExcludeExpressions();
if (!frameExcludeExpressions.isEmpty()) {
stringBuilder.append(") frame exclude (");
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
index 7fa903c..3aa9152 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
@@ -122,7 +122,9 @@
setAll(winOpTo.getNestedPlans(), winOpFrom.getNestedPlans());
setAll(winOpTo.getFrameValueExpressions(), winOpFrom.getFrameValueExpressions());
setAll(winOpTo.getFrameStartExpressions(), winOpFrom.getFrameStartExpressions());
+ setAll(winOpTo.getFrameStartValidationExpressions(), winOpFrom.getFrameStartValidationExpressions());
setAll(winOpTo.getFrameEndExpressions(), winOpFrom.getFrameEndExpressions());
+ setAll(winOpTo.getFrameEndValidationExpressions(), winOpFrom.getFrameEndValidationExpressions());
setAll(winOpTo.getFrameExcludeExpressions(), winOpFrom.getFrameExcludeExpressions());
winOpTo.setFrameExcludeNegationStartIdx(winOpFrom.getFrameExcludeNegationStartIdx());
winOpTo.getFrameOffset().setValue(winOpFrom.getFrameOffset().getValue());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
index 807f6bf..85585a0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -33,7 +34,6 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
/**
* Base class for window runtime implementations that compute nested aggregates
@@ -44,7 +44,7 @@
private final WindowAggregatorDescriptorFactory nestedAggFactory;
- private IAggregatorDescriptor nestedAgg;
+ private IWindowAggregatorDescriptor nestedAgg;
AbstractWindowNestedPlansPushRuntime(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
@@ -61,7 +61,7 @@
@Override
protected void init() throws HyracksDataException {
super.init();
- nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1);
+ nestedAgg = nestedAggCreate();
}
@Override
@@ -75,6 +75,10 @@
return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize);
}
+ final IWindowAggregatorDescriptor nestedAggCreate() throws HyracksDataException {
+ return nestedAggFactory.createAggregator(ctx, null, null, null, null, -1);
+ }
+
/**
* Aggregator created by
* {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor,
@@ -82,19 +86,46 @@
* does not process argument tuple in init()
*/
final void nestedAggInit() throws HyracksDataException {
+ nestedAggInit(nestedAgg);
+ }
+
+ static void nestedAggInit(IWindowAggregatorDescriptor nestedAgg) throws HyracksDataException {
nestedAgg.init(null, null, -1, null);
}
final void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
+ nestedAggAggregate(nestedAgg, tAccess, tIndex);
+ }
+
+ static void nestedAggAggregate(IWindowAggregatorDescriptor nestedAgg, FrameTupleAccessor tAccess, int tIndex)
+ throws HyracksDataException {
nestedAgg.aggregate(tAccess, tIndex, null, -1, null);
}
final void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ nestedAggOutputFinalResult(nestedAgg, outTupleBuilder);
+ }
+
+ static void nestedAggOutputFinalResult(IWindowAggregatorDescriptor nestedAgg, ArrayTupleBuilder outTupleBuilder)
+ throws HyracksDataException {
nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null);
}
final void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
- nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null);
+ nestedAggOutputPartialResult(nestedAgg, outTupleBuilder);
+ }
+
+ static boolean nestedAggOutputPartialResult(IWindowAggregatorDescriptor nestedAgg,
+ ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ return nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null);
+ }
+
+ final void nestAggDiscardFinalResult() throws HyracksDataException {
+ nestAggDiscardFinalResult(nestedAgg);
+ }
+
+ static void nestAggDiscardFinalResult(IWindowAggregatorDescriptor nestedAgg) throws HyracksDataException {
+ nestedAgg.discardFinalResult();
}
static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx)
@@ -120,4 +151,16 @@
}
return new PointableTupleReference(pointables);
}
+
+ static boolean allTrue(PointableTupleReference tupleRef, IBinaryBooleanInspector boolAccessor)
+ throws HyracksDataException {
+ for (int i = 0, ln = tupleRef.getFieldCount(); i < ln; i++) {
+ IPointable field = tupleRef.getField(i);
+ boolean b = boolAccessor.getBooleanValue(field.getByteArray(), field.getStartOffset(), field.getLength());
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/IWindowAggregatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/IWindowAggregatorDescriptor.java
new file mode 100644
index 0000000..b95cd1f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/IWindowAggregatorDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.std.group.AggregateState;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+interface IWindowAggregatorDescriptor extends IAggregatorDescriptor {
+ /**
+ * This method is called when evaluating accumulating frames.
+ * It closes the pipelines but does not emit the result.
+ * The assumption is that the result had been already emitted by
+ * {@link #outputPartialResult(ArrayTupleBuilder, IFrameTupleAccessor, int, AggregateState)}
+ */
+ void discardFinalResult() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
index 4cf5ec5..0357c59 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
@@ -37,7 +37,6 @@
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.AggregateState;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
/**
* Aggregator factory for window operators
@@ -59,7 +58,7 @@
}
@Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ public IWindowAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget)
throws HyracksDataException {
NestedPlansAccumulatingAggregatorFactory.AggregatorOutput outputWriter =
@@ -87,7 +86,7 @@
}
}
- return new IAggregatorDescriptor() {
+ return new IWindowAggregatorDescriptor() {
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
@@ -110,13 +109,17 @@
@Override
public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
int tIndex, AggregateState state) throws HyracksDataException {
+ closePipelines();
+ memoryUsageCheck();
+ TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder);
+ return true;
+ }
+
+ private void closePipelines() throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].close();
}
- memoryUsageCheck();
- TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder);
- return true;
}
/**
@@ -144,6 +147,11 @@
}
@Override
+ public void discardFinalResult() throws HyracksDataException {
+ closePipelines();
+ }
+
+ @Override
public AggregateState createAggregateStates() {
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index aa9d402..cb86c5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -72,6 +74,14 @@
private PointableTupleReference frameStartPointables;
+ private final boolean frameStartValidationExists;
+
+ private final IScalarEvaluatorFactory[] frameStartValidationEvalFactories;
+
+ private IScalarEvaluator[] frameStartValidationEvals;
+
+ private PointableTupleReference frameStartValidationPointables;
+
private final boolean frameStartIsMonotonic;
private final boolean frameEndExists;
@@ -82,6 +92,14 @@
private PointableTupleReference frameEndPointables;
+ private final boolean frameEndValidationExists;
+
+ private final IScalarEvaluatorFactory[] frameEndValidationEvalFactories;
+
+ private IScalarEvaluator[] frameEndValidationEvals;
+
+ private PointableTupleReference frameEndValidationPointables;
+
private final boolean frameExcludeExists;
private final IScalarEvaluatorFactory[] frameExcludeEvalFactories;
@@ -106,16 +124,20 @@
private IPointable frameOffsetPointable;
- private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
private final int frameMaxObjects;
+ private final IBinaryBooleanInspectorFactory booleanAccessorFactory;
+
+ private IBinaryBooleanInspector booleanAccessor;
+
+ private final IBinaryIntegerInspectorFactory integerAccessorFactory;
+
+ private IBinaryIntegerInspector integerAccessor;
+
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
- private IBinaryIntegerInspector bii;
-
private int chunkIdxFrameStartGlobal;
private int tBeginIdxFrameStartGlobal;
@@ -123,13 +145,15 @@
WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
- boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameStartValidationEvalFactories, boolean frameStartIsMonotonic,
+ IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameEndValidationEvalFactories,
IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
- IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns,
- int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
- int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx,
- int memSizeInFrames, SourceLocation sourceLoc) {
+ int frameMaxObjects, IBinaryBooleanInspectorFactory booleanAccessorFactory,
+ IBinaryIntegerInspectorFactory integerAccessorFactory, int[] projectionColumns, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
+ SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
memSizeInFrames, sourceLoc);
@@ -137,9 +161,15 @@
this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
this.frameStartEvalFactories = frameStartEvalFactories;
this.frameStartExists = frameStartEvalFactories != null && frameStartEvalFactories.length > 0;
+ this.frameStartValidationEvalFactories = frameStartValidationEvalFactories;
+ this.frameStartValidationExists =
+ frameStartValidationEvalFactories != null && frameStartValidationEvalFactories.length > 0;
this.frameStartIsMonotonic = frameStartExists && frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameEndExists = frameEndEvalFactories != null && frameEndEvalFactories.length > 0;
+ this.frameEndValidationEvalFactories = frameEndValidationEvalFactories;
+ this.frameEndValidationExists =
+ frameEndValidationEvalFactories != null && frameEndValidationEvalFactories.length > 0;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameExcludeEvalFactories = frameExcludeEvalFactories;
this.frameExcludeExists = frameExcludeEvalFactories != null && frameExcludeEvalFactories.length > 0;
@@ -147,8 +177,9 @@
this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx;
this.frameOffsetExists = frameOffsetEvalFactory != null;
this.frameOffsetEvalFactory = frameOffsetEvalFactory;
- this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
this.frameMaxObjects = frameMaxObjects;
+ this.booleanAccessorFactory = booleanAccessorFactory;
+ this.integerAccessorFactory = integerAccessorFactory;
}
@Override
@@ -163,10 +194,18 @@
frameStartEvals = createEvaluators(frameStartEvalFactories, ctx);
frameStartPointables = createPointables(frameStartEvalFactories.length);
}
+ if (frameStartValidationExists) {
+ frameStartValidationEvals = createEvaluators(frameStartValidationEvalFactories, ctx);
+ frameStartValidationPointables = createPointables(frameStartValidationEvalFactories.length);
+ }
if (frameEndExists) {
frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
frameEndPointables = createPointables(frameEndEvalFactories.length);
}
+ if (frameEndValidationExists) {
+ frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
+ frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+ }
if (frameExcludeExists) {
frameExcludeEvals = createEvaluators(frameExcludeEvalFactories, ctx);
frameExcludeComparators = createBinaryComparators(frameExcludeComparatorFactories);
@@ -176,7 +215,10 @@
if (frameOffsetExists) {
frameOffsetEval = frameOffsetEvalFactory.createScalarEvaluator(ctx);
frameOffsetPointable = VoidPointable.FACTORY.createPointable();
- bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+ integerAccessor = integerAccessorFactory.createBinaryIntegerInspector(ctx);
+ }
+ if (frameStartValidationExists || frameEndValidationExists) {
+ booleanAccessor = booleanAccessorFactory.createBinaryBooleanInspector(ctx);
}
tAccess2 = new FrameTupleAccessor(inputRecordDesc);
tRef2 = new FrameTupleReference();
@@ -208,111 +250,129 @@
// running aggregates
produceTuple(tupleBuilder, tAccess, tIdx, tRef);
- // frame boundaries
- if (frameStartExists) {
- evaluate(frameStartEvals, tRef, frameStartPointables);
- }
- if (frameEndExists) {
- evaluate(frameEndEvals, tRef, frameEndPointables);
- }
- if (frameExcludeExists) {
- evaluate(frameExcludeEvals, tRef, frameExcludePointables);
- }
- int toSkip = 0;
- if (frameOffsetExists) {
- frameOffsetEval.evaluate(tRef, frameOffsetPointable);
- toSkip = bii.getIntegerValue(frameOffsetPointable.getByteArray(), frameOffsetPointable.getStartOffset(),
- frameOffsetPointable.getLength());
- }
- int toWrite = frameMaxObjects;
-
+ // nested aggregates
nestedAggInit();
- boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
- int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
- int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
-
- if (chunkIdxInnerStart < nChunks) {
- if (frameStartForward && !isFirstTupleInPartition) {
- partitionReader.restorePosition(FRAME_POSITION_SLOT);
- } else {
- partitionReader.rewind();
+ // frame boundaries
+ boolean frameStartValid = true;
+ if (frameStartExists) {
+ if (frameStartValidationExists) {
+ evaluate(frameStartValidationEvals, tRef, frameStartValidationPointables);
+ frameStartValid = allTrue(frameStartValidationPointables, booleanAccessor);
+ }
+ if (frameStartValid) {
+ evaluate(frameStartEvals, tRef, frameStartPointables);
+ }
+ }
+ boolean frameEndValid = true;
+ if (frameEndExists) {
+ if (frameEndValidationExists) {
+ evaluate(frameEndValidationEvals, tRef, frameEndValidationPointables);
+ frameEndValid = allTrue(frameEndValidationPointables, booleanAccessor);
+ }
+ if (frameEndValid) {
+ evaluate(frameEndEvals, tRef, frameEndPointables);
}
}
- int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
-
- frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
- partitionReader.savePosition(TMP_POSITION_SLOT);
- IFrame frameInner = partitionReader.nextFrame(false);
- tAccess2.reset(frameInner.getBuffer());
-
- int tBeginIdxInner;
- if (tBeginIdxInnerStart < 0) {
- tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
- } else {
- tBeginIdxInner = tBeginIdxInnerStart;
- tBeginIdxInnerStart = -1;
+ if (frameStartValid && frameEndValid) {
+ if (frameExcludeExists) {
+ evaluate(frameExcludeEvals, tRef, frameExcludePointables);
}
- int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+ int toSkip = 0;
+ if (frameOffsetExists) {
+ frameOffsetEval.evaluate(tRef, frameOffsetPointable);
+ toSkip = integerAccessor.getIntegerValue(frameOffsetPointable.getByteArray(),
+ frameOffsetPointable.getStartOffset(), frameOffsetPointable.getLength());
+ }
+ int toWrite = frameMaxObjects;
- for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
- tRef2.reset(tAccess2, tIdxInner);
+ boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
+ int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
+ int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
- if (frameStartExists || frameEndExists) {
- evaluate(frameValueEvals, tRef2, frameValuePointables);
- if (frameStartExists) {
- if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
- // skip if value < start
- continue;
+ if (chunkIdxInnerStart < nChunks) {
+ if (frameStartForward && !isFirstTupleInPartition) {
+ partitionReader.restorePosition(FRAME_POSITION_SLOT);
+ } else {
+ partitionReader.rewind();
+ }
+ }
+
+ int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
+
+ frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
+ partitionReader.savePosition(TMP_POSITION_SLOT);
+ IFrame frameInner = partitionReader.nextFrame(false);
+ tAccess2.reset(frameInner.getBuffer());
+
+ int tBeginIdxInner;
+ if (tBeginIdxInnerStart < 0) {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ } else {
+ tBeginIdxInner = tBeginIdxInnerStart;
+ tBeginIdxInnerStart = -1;
+ }
+ int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+ for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
+ tRef2.reset(tAccess2, tIdxInner);
+
+ if (frameStartExists || frameEndExists) {
+ evaluate(frameValueEvals, tRef2, frameValuePointables);
+ if (frameStartExists) {
+ if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
+ // skip if value < start
+ continue;
+ }
+ // inside the frame
+ if (chunkIdxFrameStartLocal < 0) {
+ // save position of the first tuple in this frame
+ // will continue from it in the next frame iteration
+ chunkIdxFrameStartLocal = chunkIdxInner;
+ tBeginIdxFrameStartLocal = tIdxInner;
+ partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
+ }
}
- // inside the frame
- if (chunkIdxFrameStartLocal < 0) {
- // save position of the first tuple in this frame
- // will continue from it in the next frame iteration
- chunkIdxFrameStartLocal = chunkIdxInner;
- tBeginIdxFrameStartLocal = tIdxInner;
- partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
+ if (frameEndExists
+ && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
+ // value > end => beyond the frame end
+ // exit the frame loop
+ break frame_loop;
}
}
- if (frameEndExists
- && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
- // value > end => beyond the frame end
- // exit the frame loop
+ if (frameExcludeExists && isExcluded()) {
+ // skip if excluded
+ continue;
+ }
+
+ if (toSkip > 0) {
+ // skip if offset hasn't been reached
+ toSkip--;
+ continue;
+ }
+
+ if (toWrite != 0) {
+ nestedAggAggregate(tAccess2, tIdxInner);
+ }
+ if (toWrite > 0) {
+ toWrite--;
+ }
+ if (toWrite == 0) {
break frame_loop;
}
}
- if (frameExcludeExists && isExcluded()) {
- // skip if excluded
- continue;
- }
-
- if (toSkip > 0) {
- // skip if offset hasn't been reached
- toSkip--;
- continue;
- }
-
- if (toWrite != 0) {
- nestedAggAggregate(tAccess2, tIdxInner);
- }
- if (toWrite > 0) {
- toWrite--;
- }
- if (toWrite == 0) {
- break frame_loop;
- }
}
- }
- if (frameStartIsMonotonic) {
- if (chunkIdxFrameStartLocal >= 0) {
- chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
- tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
- } else {
- // frame start not found, set it beyond the last chunk
- chunkIdxFrameStartGlobal = nChunks;
- tBeginIdxFrameStartGlobal = 0;
+ if (frameStartIsMonotonic) {
+ if (chunkIdxFrameStartLocal >= 0) {
+ chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
+ tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
+ } else {
+ // frame start not found, set it beyond the last chunk
+ chunkIdxFrameStartGlobal = nChunks;
+ tBeginIdxFrameStartGlobal = 0;
+ }
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
index fe7e93f..7ce9668 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -63,8 +65,22 @@
private PointableTupleReference frameEndPointables;
+ private final boolean frameEndValidationExists;
+
+ private final IScalarEvaluatorFactory[] frameEndValidationEvalFactories;
+
+ private IScalarEvaluator[] frameEndValidationEvals;
+
+ private PointableTupleReference frameEndValidationPointables;
+
+ private IWindowAggregatorDescriptor nestedAggForInvalidFrame;
+
private final int frameMaxObjects;
+ private final IBinaryBooleanInspectorFactory booleanAccessorFactory;
+
+ private IBinaryBooleanInspector booleanAccessor;
+
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
@@ -78,7 +94,8 @@
WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
- int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns,
+ IScalarEvaluatorFactory[] frameEndValidationEvalFactories, int frameMaxObjects,
+ IBinaryBooleanInspectorFactory booleanAccessorFactory, int[] projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
SourceLocation sourceLoc) {
@@ -86,9 +103,13 @@
runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
memSizeInFrames, sourceLoc);
this.frameValueEvalFactories = frameValueEvalFactories;
- this.frameEndEvalFactories = frameEndEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
+ this.frameEndEvalFactories = frameEndEvalFactories;
+ this.frameEndValidationEvalFactories = frameEndValidationEvalFactories;
+ this.frameEndValidationExists =
+ frameEndValidationEvalFactories != null && frameEndValidationEvalFactories.length > 0;
this.frameMaxObjects = frameMaxObjects;
+ this.booleanAccessorFactory = booleanAccessorFactory;
}
@Override
@@ -99,6 +120,12 @@
frameValuePointables = createPointables(frameValueEvalFactories.length);
frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
frameEndPointables = createPointables(frameEndEvalFactories.length);
+ if (frameEndValidationExists) {
+ frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
+ frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+ booleanAccessor = booleanAccessorFactory.createBinaryBooleanInspector(ctx);
+ nestedAggForInvalidFrame = nestedAggCreate();
+ }
tAccess2 = new FrameTupleAccessor(inputRecordDesc);
tRef2 = new FrameTupleReference();
}
@@ -107,6 +134,9 @@
protected void beginPartitionImpl() throws HyracksDataException {
super.beginPartitionImpl();
nestedAggInit();
+ if (frameEndValidationExists) {
+ nestedAggInit(nestedAggForInvalidFrame);
+ }
chunkIdxFrameEndGlobal = 0;
tBeginIdxFrameEndGlobal = -1;
toWrite = frameMaxObjects;
@@ -133,73 +163,88 @@
// running aggregates
produceTuple(tupleBuilder, tAccess, tIdx, tRef);
- // frame boundaries
- evaluate(frameEndEvals, tRef, frameEndPointables);
-
- int chunkIdxInnerStart = chunkIdxFrameEndGlobal;
- int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal;
-
- if (chunkIdxInnerStart < nChunks) {
- if (!isFirstTupleInPartition) {
- partitionReader.restorePosition(FRAME_POSITION_SLOT);
- } else {
- partitionReader.rewind();
- }
+ // frame boundary
+ boolean frameEndValid = true;
+ if (frameEndValidationExists) {
+ evaluate(frameEndValidationEvals, tRef, frameEndValidationPointables);
+ frameEndValid = allTrue(frameEndValidationPointables, booleanAccessor);
}
- int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1;
+ if (frameEndValid) {
+ evaluate(frameEndEvals, tRef, frameEndPointables);
- frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
- partitionReader.savePosition(TMP_POSITION_SLOT);
- IFrame frameInner = partitionReader.nextFrame(false);
- tAccess2.reset(frameInner.getBuffer());
+ int chunkIdxInnerStart = chunkIdxFrameEndGlobal;
+ int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal;
- int tBeginIdxInner;
- if (tBeginIdxInnerStart >= 0) {
- tBeginIdxInner = tBeginIdxInnerStart;
- tBeginIdxInnerStart = -1;
+ if (chunkIdxInnerStart < nChunks) {
+ if (!isFirstTupleInPartition) {
+ partitionReader.restorePosition(FRAME_POSITION_SLOT);
+ } else {
+ partitionReader.rewind();
+ }
+ }
+
+ int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1;
+
+ frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
+ partitionReader.savePosition(TMP_POSITION_SLOT);
+ IFrame frameInner = partitionReader.nextFrame(false);
+ tAccess2.reset(frameInner.getBuffer());
+
+ int tBeginIdxInner;
+ if (tBeginIdxInnerStart >= 0) {
+ tBeginIdxInner = tBeginIdxInnerStart;
+ tBeginIdxInnerStart = -1;
+ } else {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ }
+ int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+ for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner && toWrite != 0; tIdxInner++) {
+ tRef2.reset(tAccess2, tIdxInner);
+
+ evaluate(frameValueEvals, tRef2, frameValuePointables);
+
+ if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
+ // value > end => beyond the frame end
+ // save position of the current tuple, will continue from it in the next outer iteration
+ chunkIdxFrameEndLocal = chunkIdxInner;
+ tBeginIdxFrameEndLocal = tIdxInner;
+ partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
+ // exit the frame loop
+ break frame_loop;
+ }
+
+ nestedAggAggregate(tAccess2, tIdxInner);
+
+ if (toWrite > 0) {
+ toWrite--;
+ }
+ }
+ }
+
+ if (chunkIdxFrameEndLocal >= 0) {
+ chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
+ tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
} else {
- tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ // frame end not found, set it beyond the last chunk
+ chunkIdxFrameEndGlobal = nChunks;
+ tBeginIdxFrameEndGlobal = 0;
}
- int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
- for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner && toWrite != 0; tIdxInner++) {
- tRef2.reset(tAccess2, tIdxInner);
-
- evaluate(frameValueEvals, tRef2, frameValuePointables);
-
- if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
- // value > end => beyond the frame end
- // save position of the current tuple, will continue from it in the next outer iteration
- chunkIdxFrameEndLocal = chunkIdxInner;
- tBeginIdxFrameEndLocal = tIdxInner;
- partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
- // exit the frame loop
- break frame_loop;
- }
-
- nestedAggAggregate(tAccess2, tIdxInner);
-
- if (toWrite > 0) {
- toWrite--;
- }
- }
- }
-
- if (chunkIdxFrameEndLocal >= 0) {
- chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
- tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
+ nestedAggOutputPartialResult(tupleBuilder);
} else {
- // frame end not found, set it beyond the last chunk
- chunkIdxFrameEndGlobal = nChunks;
- tBeginIdxFrameEndGlobal = 0;
+ nestedAggOutputPartialResult(nestedAggForInvalidFrame, tupleBuilder);
}
if (isLastTupleInPartition) {
- nestedAggOutputFinalResult(tupleBuilder);
- } else {
- nestedAggOutputPartialResult(tupleBuilder);
+ // we've already emitted accumulated partial result for this tuple, so discard it
+ nestAggDiscardFinalResult();
+ if (frameEndValidationExists) {
+ nestAggDiscardFinalResult(nestedAggForInvalidFrame);
+ }
}
+
appendToFrameFromTupleBuilder(tupleBuilder);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
index 53692d1..eb11c9e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
@@ -21,6 +21,7 @@
import java.util.Arrays;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -42,30 +43,38 @@
private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+ private final IScalarEvaluatorFactory[] frameEndValidationEvalFactories;
+
private final int frameMaxObjects;
+ private final IBinaryBooleanInspectorFactory booleanAccessorFactory;
+
public WindowNestedPlansRunningRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
- int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
- IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
+ IScalarEvaluatorFactory[] frameEndValidationEvalFactories, int frameMaxObjects,
+ IBinaryBooleanInspectorFactory booleanAccessorFactory, int[] projectionColumnsExcludingSubplans,
+ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
nestedAggFactory, memSizeInFrames);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameEndEvalFactories = frameEndEvalFactories;
+ this.frameEndValidationEvalFactories = frameEndValidationEvalFactories;
this.frameMaxObjects = frameMaxObjects;
+ this.booleanAccessorFactory = booleanAccessorFactory;
}
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories,
orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories,
- frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
- nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
+ frameEndValidationEvalFactories, frameMaxObjects, booleanAccessorFactory, projectionList,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+ memSizeInFrames, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index 4a7c837..43b1bf3 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -21,6 +21,7 @@
import java.util.Arrays;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -42,10 +43,14 @@
private final IScalarEvaluatorFactory[] frameStartEvalFactories;
+ private final IScalarEvaluatorFactory[] frameStartValidatinoEvalFactories;
+
private final boolean frameStartIsMonotonic;
private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+ private final IScalarEvaluatorFactory[] frameEndValidationEvalFactories;
+
private final IScalarEvaluatorFactory[] frameExcludeEvalFactories;
private final int frameExcludeNegationStartIdx;
@@ -54,44 +59,51 @@
private final IScalarEvaluatorFactory frameOffsetEvalFactory;
- private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
private final int frameMaxObjects;
+ private final IBinaryBooleanInspectorFactory booleanAccessorFactory;
+
+ private final IBinaryIntegerInspectorFactory integerAccessorFactory;
+
public WindowNestedPlansRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
- boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameStartValidationEvalFactories, boolean frameStartIsMonotonic,
+ IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameEndValidationEvalFactories,
IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx,
IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
- IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects,
- int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
- IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
+ int frameMaxObjects, IBinaryBooleanInspectorFactory booleanAccessorFactory,
+ IBinaryIntegerInspectorFactory integerAccessorFactory, int[] projectionColumnsExcludingSubplans,
+ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
nestedAggFactory, memSizeInFrames);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameStartEvalFactories = frameStartEvalFactories;
+ this.frameStartValidatinoEvalFactories = frameStartValidationEvalFactories;
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
+ this.frameEndValidationEvalFactories = frameEndValidationEvalFactories;
this.frameExcludeEvalFactories = frameExcludeEvalFactories;
this.frameExcludeComparatorFactories = frameExcludeComparatorFactories;
this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx;
this.frameOffsetEvalFactory = frameOffsetEvalFactory;
- this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
this.frameMaxObjects = frameMaxObjects;
+ this.booleanAccessorFactory = booleanAccessorFactory;
+ this.integerAccessorFactory = integerAccessorFactory;
}
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowNestedPlansPushRuntime(partitionColumns, partitionComparatorFactories,
orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories,
- frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories,
- frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory,
- binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns,
+ frameStartEvalFactories, frameStartValidatinoEvalFactories, frameStartIsMonotonic,
+ frameEndEvalFactories, frameEndValidationEvalFactories, frameExcludeEvalFactories,
+ frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory, frameMaxObjects,
+ booleanAccessorFactory, integerAccessorFactory, projectionList, runningAggOutColumns,
runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
}