[NO ISSUE][RT] Window operator runtime optimization
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
- Improve memory management for window operators
- Add "compiler.windowmemory" property that specifies memory
budget for each window operator (default is 4MB, min is 160KB)
- Consolidated negative window operator testcases into a single one
Change-Id: I6756e92046883f79db339ef490cca8bc8b7b1fb8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3227
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index a1f819d..efffda2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator;
@@ -68,7 +67,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
@@ -285,7 +283,7 @@
}
case WINDOW: {
WindowOperator winOp = (WindowOperator) op;
- WindowPOperator physOp = createWindowPOperator(winOp);
+ WindowPOperator physOp = createWindowPOperator(winOp, context);
op.setPhysicalOperator(physOp);
break;
}
@@ -344,7 +342,8 @@
aggOp.setMergeExpressions(mergeExpressionRefs);
}
- private static WindowPOperator createWindowPOperator(WindowOperator winOp) throws CompilationException {
+ private static WindowPOperator createWindowPOperator(WindowOperator winOp, IOptimizationContext context)
+ throws CompilationException {
List<Mutable<ILogicalExpression>> partitionExprs = winOp.getPartitionExpressions();
List<LogicalVariable> partitionColumns = new ArrayList<>(partitionExprs.size());
for (Mutable<ILogicalExpression> pe : partitionExprs) {
@@ -377,7 +376,9 @@
boolean nestedTrivialAggregates = winOp.hasNestedPlans()
&& winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+ int memSizeInFrames = context.getPhysicalOptimizationConfig().getMaxFramesForWindow();
+
return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic,
- frameEndIsMonotonic, nestedTrivialAggregates);
+ frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 0bdc987..d155756 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -130,14 +130,14 @@
public static final String PREFIX_INTERNAL_PARAMETERS = "_internal";
private static final Set<String> CONFIGURABLE_PARAMETER_NAMES =
ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
- CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
- CompilerProperties.COMPILER_PARALLELISM_KEY, CompilerProperties.COMPILER_SORT_PARALLEL_KEY,
- CompilerProperties.COMPILER_SORT_SAMPLES_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
- FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
- StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
- FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
- SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
- AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION);
+ CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+ CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
+ CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
+ FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+ FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
+ FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS,
+ SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
+ "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION);
private final IRewriterFactory rewriterFactory;
private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 2942a95..3fffdac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -36,15 +36,17 @@
private final long groupByMemorySize;
private final long joinMemorySize;
private final long sortMemorySize;
+ private final long windowMemorySize;
private final long textSearchMemorySize;
private final long frameSize;
public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
- int joinFrameLimit, int textSearchFrameLimit, long frameSize) {
+ int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, long frameSize) {
this.numComputationPartitions = numComputationPartitions;
this.groupByMemorySize = groupFrameLimit * frameSize;
this.joinMemorySize = joinFrameLimit * frameSize;
this.sortMemorySize = sortFrameLimit * frameSize;
+ this.windowMemorySize = windowFrameLimit * frameSize;
this.textSearchMemorySize = textSearchFrameLimit * frameSize;
this.frameSize = frameSize;
}
@@ -145,13 +147,9 @@
private long getWindowRequiredMemory(WindowOperator op) {
WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
- int frameCount = 2;
- if (physOp.isPartitionMaterialization()) {
- frameCount++;
- }
- if (op.hasNestedPlans()) {
- frameCount += 2;
- }
- return getOperatorRequiredMemory(op, frameSize * frameCount);
+ // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
+ // streaming window operators only need 2 frames: output + copy
+ long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
+ return getOperatorRequiredMemory(op, memorySize);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index d9ead33..0f4c4c0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -58,10 +58,11 @@
final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
+ final int windowFrameLimit = physicalOptimizationConfig.getMaxFramesForWindow();
final int textSearchFrameLimit = physicalOptimizationConfig.getMaxFramesForTextSearch();
final List<PlanStage> planStages = getStages(plan);
return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
- groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize);
+ groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
}
public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -73,9 +74,10 @@
}
public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
- int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int textSearchFrameLimit, int frameSize) {
+ int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit,
+ int frameSize) {
final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
- groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize);
+ groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
final IClusterCapacity clusterCapacity = new ClusterCapacity();
final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
.orElseThrow(IllegalStateException::new);
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index 8877be8..c9c7bb9 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -53,6 +53,7 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
compiler.sort.parallel=false
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc2.conf b/asterixdb/asterix-app/src/main/resources/cc2.conf
index 65dbafc..46f7168 100644
--- a/asterixdb/asterix-app/src/main/resources/cc2.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc2.conf
@@ -53,6 +53,7 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
compiler.parallelism=-1
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc3.conf b/asterixdb/asterix-app/src/main/resources/cc3.conf
index 20aa70d..0b26ef3 100644
--- a/asterixdb/asterix-app/src/main/resources/cc3.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc3.conf
@@ -53,6 +53,7 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
compiler.parallelism=3
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/main/resources/cc4.conf b/asterixdb/asterix-app/src/main/resources/cc4.conf
index 5bdf8ea..6a66d25 100644
--- a/asterixdb/asterix-app/src/main/resources/cc4.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc4.conf
@@ -50,6 +50,7 @@
compiler.sortmemory=320KB
compiler.groupmemory=160KB
compiler.joinmemory=256KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
compiler.parallelism=-1
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index d3113ca..094009e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -301,7 +301,7 @@
private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
- FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+ FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-compression.conf b/asterixdb/asterix-app/src/test/resources/cc-compression.conf
index 904707a..e58d691 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-compression.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-compression.conf
@@ -53,5 +53,6 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
index 9c64ab4..4d2087b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf
@@ -51,5 +51,6 @@
compiler.sortmemory=320KB
compiler.groupmemory=160KB
compiler.joinmemory=256KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index 811a40d..8995e19 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -49,6 +49,7 @@
compiler.sortmemory=320KB
compiler.groupmemory=160KB
compiler.joinmemory=256KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
txn.log.partitionsize=2MB
diff --git a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
index ea00513..db04d2b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
@@ -64,6 +64,7 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
ssl.enabled=true
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/cc-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
index b6bed24..cf748c6 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
@@ -51,6 +51,7 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
txn.log.checkpoint.pollfrequency=10
diff --git a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
index ef38fc2..1d05c28 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf
@@ -53,6 +53,7 @@
compiler.sortmemory=320KB
compiler.groupmemory=160KB
compiler.joinmemory=256KB
+compiler.windowmemory=192KB
compiler.stringoffset=1
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf
index 2694408..adb71a5 100644
--- a/asterixdb/asterix-app/src/test/resources/cc.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc.conf
@@ -53,5 +53,6 @@
compiler.groupmemory=160KB
compiler.joinmemory=256KB
compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
messaging.frame.size=4096
messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
index 5655f18..e58ba11 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp
@@ -21,6 +21,8 @@
* Expected Res : SUCCESS
*/
+set `compiler.windowmemory` "256KB";
+
use test;
q1_ntile(10, 1000, 4)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
index 3120eb3..d477713 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp
@@ -21,6 +21,8 @@
* Expected Res : SUCCESS
*/
+set `compiler.windowmemory` "192KB";
+
use test;
q1_percent_rank(10, 1000, 3)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
index 8f38718..8b92104 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp
@@ -21,6 +21,8 @@
* Expected Res : SUCCESS
*/
+set `compiler.windowmemory` "512KB";
+
use test;
from q0_rnd() rnd
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
index d73a7f4..36f40c6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp
@@ -21,6 +21,8 @@
* Expected Res : SUCCESS
*/
+set `compiler.windowmemory` "224KB";
+
use test;
FROM tenk1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp
new file mode 100644
index 0000000..23be774
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test illegal value for 'compiler.windowmemory'
+ * : Must be at least 160KB (5 frames)
+ * Expected Res : FAILURE
+ */
+
+set `compiler.windowmemory` "100KB";
+
+from range(1, 10) t
+select t, first_value(t) over(order by t)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
index 2d561b9..1e4d076 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
@@ -22,6 +22,9 @@
* Expected Res : SUCCESS
*/
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
use test;
q1_sum_1_preceding_1_following(10);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
index fec6158..4546867 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
@@ -22,6 +22,9 @@
* Expected Res : SUCCESS
*/
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
use test;
q1_sum_1_preceding_1_following(10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
index 84b5234..d772965 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
@@ -22,6 +22,9 @@
* Expected Res : SUCCESS
*/
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
use test;
with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
index 8a4374f..8c0c6b9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
@@ -23,6 +23,9 @@
* Expected Res : SUCCESS
*/
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
use test;
with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
index 91c0a31..5f5c508 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
@@ -23,6 +23,9 @@
* Expected Res : SUCCESS
*/
+/* 2 frames for partition writer */
+set `compiler.windowmemory` "192KB";
+
use test;
with N as 10000, W as 5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
index ad6913c..a2d3a53 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp
@@ -22,6 +22,9 @@
* Expected Res : SUCCESS
*/
+/* 1 frame for partition writer */
+set `compiler.windowmemory` "160KB";
+
use test;
q2_max_unbounded_preceding_n_following(5000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index da464c7..2bdb6c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -16,6 +16,7 @@
"compiler\.sort\.samples" : 100,
"compiler\.sortmemory" : 327680,
"compiler\.textsearchmemory" : 163840,
+ "compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
"log\.dir" : "logs/",
"log\.level" : "INFO",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index fa8f48e..599d6b4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -16,6 +16,7 @@
"compiler\.sort\.samples" : 100,
"compiler\.sortmemory" : 327680,
"compiler\.textsearchmemory" : 163840,
+ "compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
"log\.dir" : "logs/",
"log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 801900c..56d9dd9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -16,6 +16,7 @@
"compiler\.sort\.samples" : 100,
"compiler\.sortmemory" : 327680,
"compiler\.textsearchmemory" : 163840,
+ "compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
"log\.dir" : "logs/",
"log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d368b45..c905e0c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9278,13 +9278,6 @@
</compilation-unit>
</test-case>
<test-case FilePath="window">
- <compilation-unit name="ntile_02">
- <output-dir compare="Text">ntile_01</output-dir>
- <expected-error>ASX0002: Type mismatch</expected-error>
- <source-location>false</source-location>
- </compilation-unit>
- </test-case>
- <test-case FilePath="window">
<compilation-unit name="percent_rank_01">
<output-dir compare="Text">percent_rank_01</output-dir>
</compilation-unit>
@@ -9305,14 +9298,17 @@
</compilation-unit>
</test-case>
<test-case FilePath="window">
- <compilation-unit name="ratio_to_report_02_negative">
- <output-dir compare="Text">ratio_to_report_01</output-dir>
- <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error>
+ <compilation-unit name="row_number_01">
+ <output-dir compare="Text">row_number_01</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="window">
- <compilation-unit name="row_number_01">
- <output-dir compare="Text">row_number_01</output-dir>
+ <compilation-unit name="win_negative">
+ <output-dir compare="Text">misc_01</output-dir>
+ <expected-error>ASX0002: Type mismatch</expected-error>
+ <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error>
+ <expected-error>ASX1037: Invalid query parameter compiler.windowmemory</expected-error>
+ <source-location>false</source-location>
</compilation-unit>
</test-case>
<test-case FilePath="window">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 4bfbf11..7c67b96 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -43,6 +43,10 @@
LONG_BYTE_UNIT,
StorageUtil.getLongSizeInBytes(32L, MEGABYTE),
"The memory budget (in bytes) for a group by operator instance in a partition"),
+ COMPILER_WINDOWMEMORY(
+ LONG_BYTE_UNIT,
+ StorageUtil.getLongSizeInBytes(4L, MEGABYTE),
+ "The memory budget (in bytes) for a window operator instance in a partition"),
COMPILER_TEXTSEARCHMEMORY(
LONG_BYTE_UNIT,
StorageUtil.getLongSizeInBytes(32L, MEGABYTE),
@@ -108,6 +112,8 @@
public static final String COMPILER_JOINMEMORY_KEY = Option.COMPILER_JOINMEMORY.ini();
+ public static final String COMPILER_WINDOWMEMORY_KEY = Option.COMPILER_WINDOWMEMORY.ini();
+
public static final String COMPILER_TEXTSEARCHMEMORY_KEY = Option.COMPILER_TEXTSEARCHMEMORY.ini();
public static final String COMPILER_PARALLELISM_KEY = Option.COMPILER_PARALLELISM.ini();
@@ -134,6 +140,10 @@
return accessor.getLong(Option.COMPILER_GROUPMEMORY);
}
+ public long getWindowMemorySize() {
+ return accessor.getLong(Option.COMPILER_WINDOWMEMORY);
+ }
+
public long getTextSearchMemorySize() {
return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index 9269b5e..e8643cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -33,6 +33,8 @@
private static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
+ // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable partition reader)
+ private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
// one for query, two for intermediate results, one for final result, and one for reading an inverted list
private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5;
@@ -49,6 +51,9 @@
int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
(String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc);
+ int windowFrameLimit = getFrameLimit(CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+ (String) querySpecificConfig.get(CompilerProperties.COMPILER_WINDOWMEMORY_KEY),
+ compilerProperties.getWindowMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_WINDOW, sourceLoc);
int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc);
int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc);
boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig);
@@ -58,6 +63,7 @@
physOptConf.setMaxFramesExternalSort(sortFrameLimit);
physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
physOptConf.setMaxFramesForJoin(joinFrameLimit);
+ physOptConf.setMaxFramesForWindow(windowFrameLimit);
physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit);
physOptConf.setSortParallel(fullParallelSort);
physOptConf.setSortSamples(sortNumSamples);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 1d8c47c..9a5f6d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -82,15 +82,19 @@
private final boolean nestedTrivialAggregates;
+ // The maximum number of in-memory frames that this operator can use.
+ private final int memSizeInFrames;
+
public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
- boolean nestedTrivialAggregates) {
+ boolean nestedTrivialAggregates, int memSizeInFrames) {
this.partitionColumns = partitionColumns;
this.partitionMaterialization = partitionMaterialization;
this.orderColumns = orderColumns;
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndIsMonotonic = frameEndIsMonotonic;
this.nestedTrivialAggregates = nestedTrivialAggregates;
+ this.memSizeInFrames = memSizeInFrames;
}
@Override
@@ -227,7 +231,7 @@
runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory);
+ aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
} else if (frameEndIsMonotonic && nestedTrivialAggregates) {
// special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
// trivial aggregate subplan ( aggregate + nts )
@@ -236,7 +240,8 @@
partitionComparatorFactories, orderComparatorFactories,
frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans,
- runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory);
+ runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
+ memSizeInFrames);
}
}
// default case
@@ -248,12 +253,12 @@
winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second,
frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory);
+ aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
}
} else if (partitionMaterialization) {
runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories);
+ runningAggFactories, memSizeInFrames);
} else {
runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index d879d36..f9ea0c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -30,6 +30,7 @@
private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
private static final String MAX_FRAMES_FOR_JOIN_LEFT_INPUT = "MAX_FRAMES_FOR_JOIN_LEFT_INPUT";
private static final String MAX_FRAMES_FOR_JOIN = "MAX_FRAMES_FOR_JOIN";
+ private static final String MAX_FRAMES_FOR_WINDOW = "MAX_FRAMES_FOR_WINDOW";
private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH";
private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
@@ -113,6 +114,15 @@
setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
}
+ public int getMaxFramesForWindow() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_FOR_WINDOW, (int) (((long) 4 * MB) / frameSize));
+ }
+
+ public void setMaxFramesForWindow(int frameLimit) {
+ setInt(MAX_FRAMES_FOR_WINDOW, frameLimit);
+ }
+
public int getMaxFramesForTextSearch() {
int frameSize = getFrameSize();
return getInt(MAX_FRAMES_FOR_TEXTSEARCH, (int) (((long) 32 * MB) / frameSize));
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
index 0449cf5..b6cd0eb 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
@@ -86,10 +86,11 @@
return new ArrayTupleBuilder(projectionList.length);
}
- protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException {
+ protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx, FrameTupleReference tupleRef)
+ throws HyracksDataException {
for (int t = beginIdx; t <= endIdx; t++) {
- tRef.reset(accessor, t);
- produceTuple(tupleBuilder, accessor, t, tRef);
+ tupleRef.reset(accessor, t);
+ produceTuple(tupleBuilder, accessor, t, tupleRef);
appendToFrameFromTupleBuilder(tupleBuilder);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
index 4ca166f..fd3e97d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
@@ -36,6 +36,6 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
- produceTuples(tAccess, 0, tAccess.getTupleCount() - 1);
+ produceTuples(tAccess, 0, tAccess.getTupleCount() - 1, tRef);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
index 9adeb4d..807f6bf 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -49,9 +50,10 @@
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
+ SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, ctx);
+ runningAggOutColumns, runningAggFactories, ctx, memSizeInFrames, sourceLoc);
this.nestedAggFactory = nestedAggFactory;
this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
@@ -75,23 +77,23 @@
/**
* Aggregator created by
- * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, RecordDescriptor, int[], int[], long)
- * WindowAggregatorDescriptorFactory.createAggregator(...)}
+ * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor,
+ * RecordDescriptor, int[], int[], long) WindowAggregatorDescriptorFactory.createAggregator(...)}
* does not process argument tuple in init()
*/
- void nestedAggInit() throws HyracksDataException {
+ final void nestedAggInit() throws HyracksDataException {
nestedAgg.init(null, null, -1, null);
}
- void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
+ final void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException {
nestedAgg.aggregate(tAccess, tIndex, null, -1, null);
}
- void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ final void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null);
}
- void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
+ final void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException {
nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
index 53857ac..0b8cc06 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java
@@ -25,7 +25,7 @@
/**
* Base class for window runtime factories that compute nested aggregates
*/
-abstract class AbstractWindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory {
+abstract class AbstractWindowNestedPlansRuntimeFactory extends WindowMaterializingRuntimeFactory {
private static final long serialVersionUID = 1L;
@@ -37,9 +37,9 @@
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
- int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) {
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, memSizeInFrames);
this.nestedAggFactory = nestedAggFactory;
this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
index a63aaf8..9cc25d0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
@@ -30,12 +30,14 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
+ protected final SourceLocation sourceLoc;
private final int[] partitionColumns;
private final IBinaryComparatorFactory[] partitionComparatorFactories;
private IBinaryComparator[] partitionComparators;
@@ -48,11 +50,20 @@
AbstractWindowPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
- IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
+ SourceLocation sourceLoc) {
super(projectionColumns, runningAggOutColumns, runningAggFactories, IWindowAggregateEvaluator.class, ctx);
this.partitionColumns = partitionColumns;
this.partitionComparatorFactories = partitionComparatorFactories;
this.orderComparatorFactories = orderComparatorFactories;
+ this.sourceLoc = sourceLoc;
+ }
+
+ /**
+ * Number of frames reserved by this operator: {@link #frame}, {@link #copyFrame}
+ */
+ int getReservedFrameCount() {
+ return 2;
}
@Override
@@ -78,7 +89,7 @@
@Override
public void close() throws HyracksDataException {
- if (inPartition) {
+ if (inPartition && !failed) {
endPartition();
}
super.close();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index 4e97d6c..889803a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -22,16 +22,11 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.storage.common.arraylist.IntArrayList;
/**
@@ -40,46 +35,42 @@
*/
class WindowMaterializingPushRuntime extends AbstractWindowPushRuntime {
+ private final int memSizeInFrames;
+
private long partitionLength;
- IFrame curFrame;
+ private WindowPartitionWriter partitionWriter;
- private long curFrameId;
+ WindowPartitionReader partitionReader;
private int chunkBeginIdx;
private IntArrayList chunkEndIdx;
- private RunFileWriter run;
-
- private long runLastFrameId;
-
WindowMaterializingPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
- IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, int memSizeInFrames,
+ SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, ctx);
- }
-
- @Override
- public void open() throws HyracksDataException {
- super.open();
- run = null;
- curFrameId = -1;
+ runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
+ this.memSizeInFrames = memSizeInFrames;
}
@Override
protected void init() throws HyracksDataException {
super.init();
- curFrame = new VSizeFrame(ctx);
+ String runFilePrefix = getClass().getName();
+ partitionWriter = new WindowPartitionWriter(ctx, memSizeInFrames - getReservedFrameCount(), runFilePrefix,
+ getPartitionReaderSlotCount(), sourceLoc);
+ partitionReader = partitionWriter.getReader();
chunkEndIdx = new IntArrayList(128, 128);
}
@Override
public void close() throws HyracksDataException {
super.close();
- if (run != null) {
- run.erase();
+ if (partitionWriter != null) {
+ partitionWriter.close();
}
}
@@ -87,41 +78,17 @@
protected void beginPartitionImpl() throws HyracksDataException {
chunkEndIdx.clear();
partitionLength = 0;
- if (run != null) {
- run.rewind();
- }
+ partitionWriter.reset();
}
@Override
protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx)
throws HyracksDataException {
- // save the frame. first one to memory, remaining ones to the run file
boolean isFirstChunk = chunkEndIdx.isEmpty();
+ partitionWriter.nextFrame(frameId, frameBuffer);
if (isFirstChunk) {
- if (frameId != curFrameId) {
- int pos = frameBuffer.position();
- curFrame.ensureFrameSize(frameBuffer.capacity());
- FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
- frameBuffer.position(pos);
- curFrameId = frameId;
- }
chunkBeginIdx = tBeginIdx;
- } else {
- if (tBeginIdx != 0) {
- throw new IllegalStateException(String.valueOf(tBeginIdx));
- }
- if (run == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName());
- run = new RunFileWriter(file, ctx.getIoManager());
- run.open();
- }
- int pos = frameBuffer.position();
- frameBuffer.position(0);
- run.nextFrame(frameBuffer);
- frameBuffer.position(pos);
- runLastFrameId = frameId;
}
-
chunkEndIdx.add(tEndIdx);
partitionLength += tEndIdx - tBeginIdx + 1;
}
@@ -130,40 +97,32 @@
protected void endPartitionImpl() throws HyracksDataException {
runningAggInitPartition(partitionLength);
- int nChunks = getPartitionChunkCount();
- if (nChunks == 1) {
- producePartitionTuples(0, null);
- } else {
- GeneratedRunFileReader reader = run.createReader();
- reader.open();
- try {
- for (int chunkIdx = 0; chunkIdx < nChunks; chunkIdx++) {
- if (chunkIdx > 0) {
- reader.nextFrame(curFrame);
- }
- producePartitionTuples(chunkIdx, reader);
- }
- curFrameId = runLastFrameId;
- } finally {
- reader.close();
- }
+ partitionReader.open();
+ for (int chunkIdx = 0, nChunks = getPartitionChunkCount(); chunkIdx < nChunks; chunkIdx++) {
+ IFrame chunkFrame = partitionReader.nextFrame(true);
+ producePartitionTuples(chunkIdx, chunkFrame);
}
+ partitionReader.close();
}
- protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
- tAccess.reset(curFrame.getBuffer());
- produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx));
+ void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+ tAccess.reset(chunkFrame.getBuffer());
+ produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx), tRef);
}
- int getPartitionChunkCount() {
+ final int getPartitionChunkCount() {
return chunkEndIdx.size();
}
- int getTupleBeginIdx(int chunkIdx) {
+ final int getTupleBeginIdx(int chunkIdx) {
return chunkIdx == 0 ? chunkBeginIdx : 0;
}
- int getTupleEndIdx(int chunkIdx) {
+ final int getTupleEndIdx(int chunkIdx) {
return chunkEndIdx.get(chunkIdx);
}
+
+ int getPartitionReaderSlotCount() {
+ return -1; // forward only reader by default
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
index 1b02fb1..8bb3147 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
@@ -34,18 +34,22 @@
private static final long serialVersionUID = 1L;
+ final int memSizeInFrames;
+
public WindowMaterializingRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
- int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) {
+ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories);
+ this.memSizeInFrames = memSizeInFrames;
}
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowMaterializingPushRuntime(partitionColumns, partitionComparatorFactories,
- orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx);
+ orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx,
+ memSizeInFrames, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index cb4f534..aa9d402 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -19,27 +19,23 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.DataUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.storage.common.MultiComparator;
/**
@@ -48,6 +44,14 @@
*/
class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+ private static final int PARTITION_POSITION_SLOT = 0;
+
+ private static final int FRAME_POSITION_SLOT = 1;
+
+ private static final int TMP_POSITION_SLOT = 2;
+
+ private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1;
+
private final boolean frameValueExists;
private final IScalarEvaluatorFactory[] frameValueEvalFactories;
@@ -106,14 +110,6 @@
private final int frameMaxObjects;
- private IFrame copyFrame2;
-
- private IFrame runFrame;
-
- private int runFrameChunkId;
-
- private long runFrameSize;
-
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
@@ -124,8 +120,6 @@
private int tBeginIdxFrameStartGlobal;
- private long readerPosFrameStartGlobal;
-
WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories,
@@ -134,9 +128,11 @@
IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory,
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
- int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx,
+ int memSizeInFrames, SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+ memSizeInFrames, sourceLoc);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0;
this.frameStartEvalFactories = frameStartEvalFactories;
@@ -158,7 +154,6 @@
@Override
protected void init() throws HyracksDataException {
super.init();
-
if (frameValueExists) {
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
@@ -183,9 +178,6 @@
frameOffsetPointable = VoidPointable.FACTORY.createPointable();
bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
}
-
- runFrame = new VSizeFrame(ctx);
- copyFrame2 = new VSizeFrame(ctx);
tAccess2 = new FrameTupleAccessor(inputRecordDesc);
tRef2 = new FrameTupleReference();
}
@@ -195,31 +187,22 @@
super.beginPartitionImpl();
chunkIdxFrameStartGlobal = -1;
tBeginIdxFrameStartGlobal = -1;
- readerPosFrameStartGlobal = -1;
- runFrameChunkId = -1;
}
@Override
- protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
- boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
+ protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+ partitionReader.savePosition(PARTITION_POSITION_SLOT);
- long readerPos = -1;
int nChunks = getPartitionChunkCount();
- if (nChunks > 1) {
- readerPos = reader.position();
- if (chunkIdx == 0) {
- ByteBuffer curFrameBuffer = curFrame.getBuffer();
- int pos = curFrameBuffer.position();
- copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
- FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
- curFrameBuffer.position(pos);
- }
- }
+ boolean isFirstChunkInPartition = chunkIdx == 0;
- tAccess.reset(curFrame.getBuffer());
+ tAccess.reset(chunkFrame.getBuffer());
int tBeginIdx = getTupleBeginIdx(chunkIdx);
int tEndIdx = getTupleEndIdx(chunkIdx);
+
for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+ boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;
+
tRef.reset(tAccess, tIdx);
// running aggregates
@@ -245,34 +228,23 @@
nestedAggInit();
+ boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
- if (nChunks > 1) {
- reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0);
+
+ if (chunkIdxInnerStart < nChunks) {
+ if (frameStartForward && !isFirstTupleInPartition) {
+ partitionReader.restorePosition(FRAME_POSITION_SLOT);
+ } else {
+ partitionReader.rewind();
+ }
}
int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
- long readerPosFrameStartLocal = -1;
frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
- long readerPosFrameInner;
- IFrame frameInner;
- if (chunkIdxInner == 0) {
- // first chunk's frame is always in memory
- frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
- readerPosFrameInner = 0;
- } else {
- readerPosFrameInner = reader.position();
- if (runFrameChunkId == chunkIdxInner) {
- // runFrame has this chunk, so just advance the reader
- reader.seek(readerPosFrameInner + runFrameSize);
- } else {
- reader.nextFrame(runFrame);
- runFrameSize = reader.position() - readerPosFrameInner;
- runFrameChunkId = chunkIdxInner;
- }
- frameInner = runFrame;
- }
+ partitionReader.savePosition(TMP_POSITION_SLOT);
+ IFrame frameInner = partitionReader.nextFrame(false);
tAccess2.reset(frameInner.getBuffer());
int tBeginIdxInner;
@@ -294,17 +266,19 @@
// skip if value < start
continue;
}
+ // inside the frame
if (chunkIdxFrameStartLocal < 0) {
- // save position of the first tuple that matches the frame start.
- // we'll continue from it in the next frame iteration
+ // save position of the first tuple in this frame
+ // will continue from it in the next frame iteration
chunkIdxFrameStartLocal = chunkIdxInner;
tBeginIdxFrameStartLocal = tIdxInner;
- readerPosFrameStartLocal = readerPosFrameInner;
+ partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
}
}
if (frameEndExists
&& frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
- // skip and exit if value > end
+ // value > end => beyond the frame end
+ // exit the frame loop
break frame_loop;
}
}
@@ -331,27 +305,22 @@
}
}
- nestedAggOutputFinalResult(tupleBuilder);
- appendToFrameFromTupleBuilder(tupleBuilder);
-
if (frameStartIsMonotonic) {
- frameStartForward = true;
if (chunkIdxFrameStartLocal >= 0) {
chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
- readerPosFrameStartGlobal = readerPosFrameStartLocal;
} else {
- // frame start not found, set start beyond the last chunk
+ // frame start not found, set it beyond the last chunk
chunkIdxFrameStartGlobal = nChunks;
tBeginIdxFrameStartGlobal = 0;
- readerPosFrameStartGlobal = 0;
}
}
+
+ nestedAggOutputFinalResult(tupleBuilder);
+ appendToFrameFromTupleBuilder(tupleBuilder);
}
- if (nChunks > 1) {
- reader.seek(readerPos);
- }
+ partitionReader.restorePosition(PARTITION_POSITION_SLOT);
}
private boolean isExcluded() throws HyracksDataException {
@@ -368,4 +337,9 @@
}
return true;
}
+
+ @Override
+ protected int getPartitionReaderSlotCount() {
+ return PARTITION_READER_SLOT_COUNT;
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
index e550d65..fe7e93f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -19,21 +19,17 @@
package org.apache.hyracks.algebricks.runtime.operators.win;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.storage.common.MultiComparator;
/**
@@ -41,7 +37,15 @@
* as well as regular aggregates (in nested plans) over accumulating window frames
* (unbounded preceding to current row or N following).
*/
-class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+final class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime {
+
+ private static final int PARTITION_POSITION_SLOT = 0;
+
+ private static final int FRAME_POSITION_SLOT = 1;
+
+ private static final int TMP_POSITION_SLOT = 2;
+
+ private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1;
private final IScalarEvaluatorFactory[] frameValueEvalFactories;
@@ -61,14 +65,6 @@
private final int frameMaxObjects;
- private IFrame copyFrame2;
-
- private IFrame runFrame;
-
- private int runFrameChunkId;
-
- private long runFrameSize;
-
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
@@ -77,8 +73,6 @@
private int tBeginIdxFrameEndGlobal;
- private long readerPosFrameEndGlobal;
-
private int toWrite;
WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
@@ -86,9 +80,11 @@
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames,
+ SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+ memSizeInFrames, sourceLoc);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
@@ -98,15 +94,11 @@
@Override
protected void init() throws HyracksDataException {
super.init();
-
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
frameValuePointables = createPointables(frameValueEvalFactories.length);
frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
frameEndPointables = createPointables(frameEndEvalFactories.length);
-
- runFrame = new VSizeFrame(ctx);
- copyFrame2 = new VSizeFrame(ctx);
tAccess2 = new FrameTupleAccessor(inputRecordDesc);
tRef2 = new FrameTupleReference();
}
@@ -117,32 +109,25 @@
nestedAggInit();
chunkIdxFrameEndGlobal = 0;
tBeginIdxFrameEndGlobal = -1;
- readerPosFrameEndGlobal = 0;
- runFrameChunkId = -1;
toWrite = frameMaxObjects;
}
@Override
- protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException {
- long readerPos = -1;
+ protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
+ partitionReader.savePosition(PARTITION_POSITION_SLOT);
+
int nChunks = getPartitionChunkCount();
- if (nChunks > 1) {
- readerPos = reader.position();
- if (chunkIdx == 0) {
- ByteBuffer curFrameBuffer = curFrame.getBuffer();
- int pos = curFrameBuffer.position();
- copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
- FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
- curFrameBuffer.position(pos);
- }
- }
+ boolean isFirstChunkInPartition = chunkIdx == 0;
+ boolean isLastChunkInPartition = chunkIdx == nChunks - 1;
- boolean isLastChunk = chunkIdx == nChunks - 1;
-
- tAccess.reset(curFrame.getBuffer());
+ tAccess.reset(chunkFrame.getBuffer());
int tBeginIdx = getTupleBeginIdx(chunkIdx);
int tEndIdx = getTupleEndIdx(chunkIdx);
+
for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+ boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;
+ boolean isLastTupleInPartition = isLastChunkInPartition && tIdx == tEndIdx;
+
tRef.reset(tAccess, tIdx);
// running aggregates
@@ -153,40 +138,28 @@
int chunkIdxInnerStart = chunkIdxFrameEndGlobal;
int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal;
- if (nChunks > 1) {
- reader.seek(readerPosFrameEndGlobal);
+
+ if (chunkIdxInnerStart < nChunks) {
+ if (!isFirstTupleInPartition) {
+ partitionReader.restorePosition(FRAME_POSITION_SLOT);
+ } else {
+ partitionReader.rewind();
+ }
}
int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1;
- long readerPosFrameEndLocal = -1;
frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
- long readerPosFrameInner;
- IFrame frameInner;
- if (chunkIdxInner == 0) {
- // first chunk's frame is always in memory
- frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
- readerPosFrameInner = 0;
- } else {
- readerPosFrameInner = reader.position();
- if (runFrameChunkId == chunkIdxInner) {
- // runFrame has this chunk, so just advance the reader
- reader.seek(readerPosFrameInner + runFrameSize);
- } else {
- reader.nextFrame(runFrame);
- runFrameSize = reader.position() - readerPosFrameInner;
- runFrameChunkId = chunkIdxInner;
- }
- frameInner = runFrame;
- }
+ partitionReader.savePosition(TMP_POSITION_SLOT);
+ IFrame frameInner = partitionReader.nextFrame(false);
tAccess2.reset(frameInner.getBuffer());
int tBeginIdxInner;
- if (tBeginIdxInnerStart < 0) {
- tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
- } else {
+ if (tBeginIdxInnerStart >= 0) {
tBeginIdxInner = tBeginIdxInnerStart;
tBeginIdxInnerStart = -1;
+ } else {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
}
int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
@@ -194,14 +167,14 @@
tRef2.reset(tAccess2, tIdxInner);
evaluate(frameValueEvals, tRef2, frameValuePointables);
+
if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
- // save position of the tuple that matches the frame end.
- // we'll continue from it in the next outer iteration
+ // value > end => beyond the frame end
+ // save position of the current tuple, will continue from it in the next outer iteration
chunkIdxFrameEndLocal = chunkIdxInner;
tBeginIdxFrameEndLocal = tIdxInner;
- readerPosFrameEndLocal = readerPosFrameInner;
-
- // skip and exit if value > end
+ partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
+ // exit the frame loop
break frame_loop;
}
@@ -213,28 +186,28 @@
}
}
- boolean isLastTuple = isLastChunk && tIdx == tEndIdx;
- if (isLastTuple) {
+ if (chunkIdxFrameEndLocal >= 0) {
+ chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
+ tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
+ } else {
+ // frame end not found, set it beyond the last chunk
+ chunkIdxFrameEndGlobal = nChunks;
+ tBeginIdxFrameEndGlobal = 0;
+ }
+
+ if (isLastTupleInPartition) {
nestedAggOutputFinalResult(tupleBuilder);
} else {
nestedAggOutputPartialResult(tupleBuilder);
}
appendToFrameFromTupleBuilder(tupleBuilder);
-
- if (chunkIdxFrameEndLocal >= 0) {
- chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal;
- tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal;
- readerPosFrameEndGlobal = readerPosFrameEndLocal;
- } else {
- // could not find the end, set beyond the last chunk
- chunkIdxFrameEndGlobal = nChunks;
- tBeginIdxFrameEndGlobal = 0;
- readerPosFrameEndGlobal = 0;
- }
}
- if (nChunks > 1) {
- reader.seek(readerPos);
- }
+ partitionReader.restorePosition(PARTITION_POSITION_SLOT);
+ }
+
+ @Override
+ protected int getPartitionReaderSlotCount() {
+ return PARTITION_READER_SLOT_COUNT;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
index ddeaf2b..53692d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java
@@ -50,10 +50,10 @@
IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories,
int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory) {
+ WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
- nestedAggFactory);
+ nestedAggFactory, memSizeInFrames);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameEndEvalFactories = frameEndEvalFactories;
@@ -65,7 +65,7 @@
return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories,
orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories,
frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
- nestedAggFactory, ctx);
+ nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index f754b91..4a7c837 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -68,10 +68,10 @@
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects,
int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory) {
+ WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
- nestedAggFactory);
+ nestedAggFactory, memSizeInFrames);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameStartEvalFactories = frameStartEvalFactories;
@@ -92,7 +92,7 @@
frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories,
frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory,
binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns,
- runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
index b25a36c..d97c855 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
@@ -55,9 +56,11 @@
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects, int[] projectionColumns,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories,
- int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) {
+ int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx,
+ int memSizeInFrames, SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx,
+ memSizeInFrames, sourceLoc);
this.frameMaxObjects = frameMaxObjects;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
index 0f7d9cf..b2dfbca 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java
@@ -41,10 +41,10 @@
IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects,
int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
- WindowAggregatorDescriptorFactory nestedAggFactory) {
+ WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize,
- nestedAggFactory);
+ nestedAggFactory, memSizeInFrames);
this.frameMaxObjects = frameMaxObjects;
}
@@ -52,7 +52,7 @@
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowNestedPlansUnboundedPushRuntime(partitionColumns, partitionComparatorFactories,
orderComparatorFactories, frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories,
- nestedAggOutSchemaSize, nestedAggFactory, ctx);
+ nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java
new file mode 100644
index 0000000..208effc
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+interface WindowPartitionReader {
+
+ void open() throws HyracksDataException;
+
+ IFrame nextFrame(boolean primaryScan) throws HyracksDataException;
+
+ void close() throws HyracksDataException;
+
+ // position manipulation
+
+ void rewind();
+
+ void savePosition(int slotNo);
+
+ void restorePosition(int slotNo);
+
+ void copyPosition(int slotFrom, int slotTo);
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java
new file mode 100644
index 0000000..b3eb317
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+
+final class WindowPartitionWriter {
+
+ private final IHyracksTaskContext ctx;
+
+ private final String fileNamePrefix;
+
+ private final SourceLocation sourceLoc;
+
+ private final IFrame[] writerFrames;
+
+ private int writerFrameCount;
+
+ private long writerFirstFrameId;
+
+ private long writerLastFrameId;
+
+ private RunFileWriter fileWriter;
+
+ private final AbstractWindowPartitionReader partitionReader;
+
+ WindowPartitionWriter(IHyracksTaskContext ctx, int memSizeInFrames, String fileNamePrefix,
+ int readerPositionStoreSize, SourceLocation sourceLoc) throws HyracksDataException {
+ this.ctx = ctx;
+ this.fileNamePrefix = fileNamePrefix;
+ this.sourceLoc = sourceLoc;
+ partitionReader = readerPositionStoreSize < 1 ? new WindowPartitionForwardReader()
+ : new WindowPartitionSeekableReader(readerPositionStoreSize);
+ int writerFrameBudget = memSizeInFrames - partitionReader.getReservedFrameCount();
+ if (writerFrameBudget < 1) {
+ throw new IllegalArgumentException(String.valueOf(memSizeInFrames));
+ }
+ writerFrames = new IFrame[writerFrameBudget];
+ // Allocate one writer frame here. Remaining frames will be allocated lazily while writing
+ allocateFrames(writerFrames, 1);
+ writerFirstFrameId = writerLastFrameId = -1;
+ }
+
+ void close() throws HyracksDataException {
+ try {
+ partitionReader.closeFileReader();
+ } finally {
+ if (fileWriter != null) {
+ fileWriter.close();
+ }
+ }
+ }
+
+ void reset() {
+ writerFrameCount = 0;
+ if (fileWriter != null) {
+ fileWriter.rewind();
+ }
+ }
+
+ void nextFrame(long frameId, ByteBuffer frameBuffer) throws HyracksDataException {
+ if (frameId < 0) {
+ throw new IllegalArgumentException(String.valueOf(frameId));
+ }
+ if (writerFrameCount == 0) {
+ if (writerFirstFrameId != frameId) {
+ copyToFrame(frameBuffer, writerFrames[0]);
+ writerFirstFrameId = frameId;
+ }
+ } else if (writerFrameCount < writerFrames.length) {
+ IFrame writerFrame = writerFrames[writerFrameCount];
+ if (writerFrame == null) {
+ writerFrames[writerFrameCount] = writerFrame = new VSizeFrame(ctx);
+ }
+ copyToFrame(frameBuffer, writerFrame);
+ } else {
+ if (fileWriter == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(fileNamePrefix);
+ fileWriter = new RunFileWriter(file, ctx.getIoManager());
+ fileWriter.open();
+ }
+ int pos = frameBuffer.position();
+ frameBuffer.position(0);
+ fileWriter.nextFrame(frameBuffer);
+ frameBuffer.position(pos);
+ }
+ writerLastFrameId = frameId;
+ writerFrameCount++;
+ }
+
+ WindowPartitionReader getReader() {
+ return partitionReader;
+ }
+
+ private void allocateFrames(IFrame[] outFrames, int count) throws HyracksDataException {
+ for (int i = 0; i < count; i++) {
+ outFrames[i] = new VSizeFrame(ctx);
+ }
+ }
+
+ private static void copyToFrame(ByteBuffer fromBuffer, IFrame toFrame) throws HyracksDataException {
+ toFrame.ensureFrameSize(fromBuffer.capacity());
+ int fromPosition = fromBuffer.position();
+ FrameUtils.copyAndFlip(fromBuffer, toFrame.getBuffer());
+ fromBuffer.position(fromPosition);
+ }
+
+ private static <T> void swap(T[] array1, int index1, T[] array2, int index2) {
+ T item1 = array1[index1];
+ array1[index1] = array2[index2];
+ array2[index2] = item1;
+ }
+
+ private abstract class AbstractWindowPartitionReader implements WindowPartitionReader {
+
+ int readerFrameIdx = -1;
+
+ GeneratedRunFileReader fileReader;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (readerFrameIdx >= 0) {
+ throw new IllegalStateException(String.valueOf(readerFrameIdx));
+ }
+ readerFrameIdx = 0;
+
+ if (writerFrameCount > writerFrames.length) {
+ openFileReader();
+ }
+ }
+
+ @Override
+ public final void close() throws HyracksDataException {
+ if (readerFrameIdx != writerFrameCount) {
+ throw new IllegalStateException();
+ }
+
+ // closeImpl() must guarantee that first writer frame will contain content of the last partition frame
+ closeImpl();
+ writerFirstFrameId = writerLastFrameId;
+
+ readerFrameIdx = -1;
+
+ if (writerFrameCount > writerFrames.length) {
+ closeFileReader();
+ }
+ }
+
+ void openFileReader() throws HyracksDataException {
+ if (fileReader != null) {
+ throw new IllegalStateException();
+ }
+ fileReader = fileWriter.createReader();
+ fileReader.open();
+ }
+
+ void closeFileReader() throws HyracksDataException {
+ GeneratedRunFileReader r = fileReader;
+ if (r != null) {
+ fileReader = null;
+ r.close();
+ }
+ }
+
+ void readFromFileReader(IFrame outFrame) throws HyracksDataException {
+ if (!fileReader.nextFrame(outFrame)) {
+ throw HyracksDataException.create(ErrorCode.EOF, sourceLoc);
+ }
+ }
+
+ @Override
+ public final IFrame nextFrame(boolean primaryScan) throws HyracksDataException {
+ if (readerFrameIdx < 0) {
+ throw new IllegalStateException();
+ }
+ if (readerFrameIdx >= writerFrameCount) {
+ throw HyracksDataException.create(ErrorCode.EOF, sourceLoc);
+ }
+ IFrame frame = nextFrameImpl(primaryScan);
+ readerFrameIdx++;
+ return frame;
+ }
+
+ abstract void closeImpl() throws HyracksDataException;
+
+ abstract IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException;
+
+ abstract int getReservedFrameCount();
+ }
+
+ private final class WindowPartitionForwardReader extends AbstractWindowPartitionReader {
+
+ @Override
+ IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException {
+ if (!primaryScan) {
+ throw new IllegalArgumentException();
+ }
+ if (readerFrameIdx < writerFrames.length) {
+ return writerFrames[readerFrameIdx];
+ } else {
+ IFrame writerFrame0 = writerFrames[0];
+ readFromFileReader(writerFrame0);
+ return writerFrame0;
+ }
+ }
+
+ @Override
+ void closeImpl() {
+ int endFrameIdx = readerFrameIdx - 1;
+ if (endFrameIdx > 0 && endFrameIdx < writerFrames.length) {
+ // last partition frame is in writerFrames -> make it the first one
+ swap(writerFrames, 0, writerFrames, endFrameIdx);
+ }
+ }
+
+ @Override
+ public void savePosition(int slotNo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void copyPosition(int slotFrom, int slotTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void restorePosition(int slotNo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void rewind() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ int getReservedFrameCount() {
+ return 0;
+ }
+ }
+
+ private final class WindowPartitionSeekableReader extends AbstractWindowPartitionReader {
+
+ private final IFrame[] fileFrames;
+
+ private final long[] fileFrameIdxs;
+
+ private final long[] fileFrameSizes;
+
+ private final long[] filePositionStore;
+
+ private final int[] readerFrameIdxStore;
+
+ private WindowPartitionSeekableReader(int positionStoreSize) throws HyracksDataException {
+ fileFrames = new IFrame[2]; // run file frames: one for primary scan, another for non-primary
+ allocateFrames(fileFrames, fileFrames.length);
+ fileFrameIdxs = new long[fileFrames.length];
+ fileFrameSizes = new long[fileFrames.length];
+ filePositionStore = new long[positionStoreSize];
+ readerFrameIdxStore = new int[positionStoreSize];
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ Arrays.fill(fileFrameIdxs, -1);
+ Arrays.fill(filePositionStore, -1);
+ Arrays.fill(readerFrameIdxStore, -1);
+ }
+
+ @Override
+ IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException {
+ if (readerFrameIdx < writerFrames.length) {
+ return writerFrames[readerFrameIdx];
+ } else {
+ int fileFrameSlot = primaryScan ? 0 : 1;
+ IFrame fileFrameRef = fileFrames[fileFrameSlot];
+ long filePosition = fileReader.position();
+ if (readerFrameIdx == fileFrameIdxs[fileFrameSlot]) {
+ fileReader.seek(filePosition + fileFrameSizes[fileFrameSlot]);
+ } else {
+ readFromFileReader(fileFrameRef);
+ fileFrameSizes[fileFrameSlot] = fileReader.position() - filePosition;
+ fileFrameIdxs[fileFrameSlot] = readerFrameIdx;
+ }
+ return fileFrameRef;
+ }
+ }
+
+ @Override
+ public void closeImpl() {
+ int endFrameIdx = readerFrameIdx - 1;
+ if (endFrameIdx >= writerFrames.length) {
+ // last partition frame was in the run file -> get contents from the file frame
+ swap(writerFrames, 0, fileFrames, 0);
+ } else if (endFrameIdx > 0) {
+ // last partition frame is in writerFrames -> make it the first one
+ swap(writerFrames, 0, writerFrames, endFrameIdx);
+ }
+ }
+
+ @Override
+ public void savePosition(int slotNo) {
+ readerFrameIdxStore[slotNo] = readerFrameIdx;
+ filePositionStore[slotNo] = fileReader != null ? fileReader.position() : 0;
+ }
+
+ @Override
+ public void copyPosition(int slotFrom, int slotTo) {
+ readerFrameIdxStore[slotTo] = readerFrameIdxStore[slotFrom];
+ filePositionStore[slotTo] = filePositionStore[slotFrom];
+ }
+
+ @Override
+ public void restorePosition(int slotNo) {
+ seek(readerFrameIdxStore[slotNo], filePositionStore[slotNo]);
+ }
+
+ @Override
+ public void rewind() {
+ seek(0, 0);
+ }
+
+ private void seek(int readerFrameIdx, long filePosition) {
+ this.readerFrameIdx = readerFrameIdx;
+ if (fileReader != null) {
+ fileReader.seek(filePosition);
+ }
+ }
+
+ @Override
+ int getReservedFrameCount() {
+ return fileFrames.length;
+ }
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
index bf71ed9..f7f1a25 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
/**
* Runtime for window operators that evaluates running aggregates without partition materialization.
@@ -33,9 +34,10 @@
WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
- IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) {
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
+ SourceLocation sourceLoc) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns,
- runningAggOutColumns, runningAggFactories, ctx);
+ runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
}
@Override
@@ -47,7 +49,7 @@
protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx)
throws HyracksDataException {
tAccess.reset(frameBuffer);
- produceTuples(tAccess, tBeginIdx, tEndIdx);
+ produceTuples(tAccess, tBeginIdx, tEndIdx, tRef);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
index ded399f..2d1cdde 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
@@ -43,7 +43,7 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
- projectionList, runningAggOutColumns, runningAggFactories, ctx);
+ projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index d6860f7..4ff0df3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -153,6 +153,7 @@
public static final int NO_RANGEMAP_PRODUCED = 117;
public static final int RANGEMAP_NOT_FOUND = 118;
public static final int UNSUPPORTED_WINDOW_SPEC = 119;
+ public static final int EOF = 120;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index e95bf76..5c9863c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -136,6 +136,7 @@
117 = No range map produced for parallel sort
118 = Range map was not found for parallel sort
119 = Unsupported window specification: PARTITION BY %1$s, ORDER BY %2$s
+120 = End of file
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index 03fdb49..50cacb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -73,7 +73,7 @@
int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
if (readLength <= 0) {
- throw new HyracksDataException("Premature end of file");
+ throw HyracksDataException.create(ErrorCode.EOF);
}
readPtr += readLength;
frame.ensureFrameSize(frame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frame.getBuffer()));
@@ -81,7 +81,7 @@
if (readPtr < size) {
readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
if (readLength < 0) {
- throw new HyracksDataException("Premature end of file");
+ throw HyracksDataException.create(ErrorCode.EOF);
}
readPtr += readLength;
}