[ASTERIXDB-2552][RT] Implement micro external sort for subplans
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch is to change the in memory sort used in subplans to
be a micro external sort to avoid out of memory error.
- added reset() to the runs merger to allow reusing the runs merger.
- renamed "InMemoryStableSortPOperator" to "MicroStableSortPOperator".
- changed the tag from "IN_MEMORY_STABLE_SORT" to "MICRO_STABLE_SORT".
- added test cases.
Change-Id: I930849d644c60d461d2869c9773b85e49b46fbdb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3353
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp
new file mode 100644
index 0000000..a075cd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/micro_external_sort/micro_external_sort.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// testing micro external sort in a subplan
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+set `compiler.sortmemory` "130KB";
+
+with ds AS (
+from range(0,89999) i
+select value {"id": i, "a":
+CASE ( i % 3)
+WHEN 0 THEN "one"
+WHEN 1 THEN "two"
+WHEN 2 THEN "three"
+END,
+"b": CASE ( (i div 3) % 3)
+WHEN 0 THEN "SUM_1s"
+WHEN 1 THEN "SUM_2s"
+WHEN 2 THEN "SUM_3s" END,
+"c": CASE ((i div 3) % 3)
+WHEN 0 THEN 1
+WHEN 1 THEN 2
+WHEN 2 THEN 3 END})
+SELECT `group`,
+(SELECT group_num, sum(g.d.c) AS sum
+FROM g
+GROUP BY g.d.b AS group_num
+ORDER BY sum
+) AS sum_per_group
+FROM ds d
+GROUP BY d.a AS `group`
+GROUP AS g
+ORDER BY `group`;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan
index 2918193..f3dbc01 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan
@@ -10,7 +10,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- IN_MEMORY_STABLE_SORT [$$200(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$200(ASC)] |LOCAL|
-- ASSIGN |LOCAL|
-- UNNEST |LOCAL|
-- SUBPLAN |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan
index 6b66a53..f9d98dd 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.10.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$62(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$62(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan
index dd3bb8f..55e2f55 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.11.plan
@@ -7,13 +7,13 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$62(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$62(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$67(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$67(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan
index 1766fc3..469fa47 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.12.plan
@@ -11,7 +11,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$66(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$66(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan
index 98c4c4f..d89ba4a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.13.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$76(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$76(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
@@ -17,7 +17,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$86(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$86(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan
index b563fef..aa3554b 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.14.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$100(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$100(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
@@ -17,13 +17,13 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$110(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$110(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$105(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$105(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan
index ab82bf0..79505b4 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.16.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$x(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$x(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
@@ -18,13 +18,13 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$z(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$z(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$y(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$y(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan
index 6249d46..61ecb60 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.17.plan
@@ -10,13 +10,13 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$106(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$106(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$111(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$111(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -58,13 +58,13 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$124(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$124(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$129(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$129(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan
index 6d8202e..4199770 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-sql-sugar/distinct_mixed/distinct_mixed.9.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$58(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$58(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan
index f4eea8a..aeeb046 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/group-by/sugar-06-distinct.plan
@@ -12,7 +12,7 @@
{
-- AGGREGATE |LOCAL|
-- MICRO_PRE_SORTED_DISTINCT_BY |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$45(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$45(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
index ef733cb..b85e9b0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
@@ -5,7 +5,7 @@
{
-- ASSIGN |LOCAL|
-- ASSIGN |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$16(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$16(ASC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- COMMIT |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan
new file mode 100644
index 0000000..4f8ff43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/micro_external_sort/micro_external_sort.plan
@@ -0,0 +1,27 @@
+-- DISTRIBUTE_RESULT |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- PRE_CLUSTERED_GROUP_BY[$$145] |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- MICRO_STABLE_SORT [$$149(ASC)] |LOCAL|
+ -- MICRO_PRE_CLUSTERED_GROUP_BY[$$146] |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- MICRO_STABLE_SORT [$$146(ASC)] |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$145(ASC)] |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan
index d4cdff4..0b8a56d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nest_aggregate.plan
@@ -8,13 +8,13 @@
{
-- AGGREGATE |LOCAL|
-- STREAM_LIMIT |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$39(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$39(ASC)] |LOCAL|
-- MICRO_PRE_CLUSTERED_GROUP_BY[$$51] |LOCAL|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- IN_MEMORY_STABLE_SORT [$$51(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$51(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan
index 957a2ee..9232477 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/nested_loj3.plan
@@ -19,7 +19,7 @@
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- IN_MEMORY_STABLE_SORT [$$31(ASC), $$32(ASC), $$33(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$31(ASC), $$32(ASC), $$33(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
index f2adbf6..241257b 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
@@ -15,7 +15,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- IN_MEMORY_STABLE_SORT [$$19(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$19(ASC)] |LOCAL|
-- ASSIGN |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
index 4405b9d..612a9b7 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
@@ -7,7 +7,7 @@
{
-- AGGREGATE |LOCAL|
-- STREAM_LIMIT |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$62(DESC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$62(DESC)] |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan
index e8b029a..e7ac5d6 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-dual-order.plan
@@ -23,7 +23,7 @@
-- PRE_CLUSTERED_GROUP_BY[$$126] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
@@ -113,7 +113,7 @@
-- PRE_CLUSTERED_GROUP_BY[$$129] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan
index ab700b6..cd036ff 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/similarity/jaccard-similarity-join-right-ahead.plan
@@ -23,7 +23,7 @@
-- PRE_CLUSTERED_GROUP_BY[$$78] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
@@ -80,7 +80,7 @@
-- PRE_CLUSTERED_GROUP_BY[$$80] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$i(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$i(ASC)] |LOCAL|
-- STREAM_SELECT |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan
index f86825e..6e92aec 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-1.plan
@@ -10,7 +10,7 @@
-- SUBPLAN |UNPARTITIONED|
{
-- AGGREGATE |UNPARTITIONED|
- -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |UNPARTITIONED|
+ -- MICRO_STABLE_SORT [$$j(ASC)] |UNPARTITIONED|
-- UNNEST |UNPARTITIONED|
-- NESTED_TUPLE_SOURCE |UNPARTITIONED|
}
@@ -28,7 +28,7 @@
-- AGGREGATE |LOCAL|
-- STREAM_SELECT |UNPARTITIONED|
-- RUNNING_AGGREGATE |UNPARTITIONED|
- -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |UNPARTITIONED|
+ -- MICRO_STABLE_SORT [$$j(ASC)] |UNPARTITIONED|
-- UNNEST |UNPARTITIONED|
-- NESTED_TUPLE_SOURCE |UNPARTITIONED|
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
index 2eb7603..0c0b45d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
@@ -12,7 +12,7 @@
-- SUBPLAN |LOCAL|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$j(ASC)] |LOCAL|
-- UNNEST |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
@@ -22,7 +22,7 @@
-- SUBPLAN |PARTITIONED|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$98(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$98(ASC)] |LOCAL|
-- ASSIGN |LOCAL|
-- UNNEST |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -41,7 +41,7 @@
-- SUBPLAN |LOCAL|
{
-- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL|
+ -- MICRO_STABLE_SORT [$$j(ASC)] |LOCAL|
-- UNNEST |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp
new file mode 100644
index 0000000..7d100cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/sorting/micro_external_sort/micro_external_sort.1.query.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// testing micro external sort in a subplan that generates runs and merges them
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+set `compiler.sortmemory` "130KB";
+
+with ds AS (
+from range(0,89999) i
+select value {"id": i, "a":
+CASE ( i % 3)
+WHEN 0 THEN "one"
+WHEN 1 THEN "two"
+WHEN 2 THEN "three"
+END,
+"b": CASE ( (i div 3) % 3)
+WHEN 0 THEN "SUM_1s"
+WHEN 1 THEN "SUM_2s"
+WHEN 2 THEN "SUM_3s" END,
+"c": CASE ((i div 3) % 3)
+WHEN 0 THEN 1
+WHEN 1 THEN 2
+WHEN 2 THEN 3 END})
+SELECT `group`,
+(SELECT group_num, sum(g.d.c) AS sum
+FROM g
+GROUP BY g.d.b AS group_num
+ORDER BY sum
+) AS sum_per_group
+FROM ds d
+GROUP BY d.a AS `group`
+GROUP AS g
+ORDER BY `group`;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm
new file mode 100644
index 0000000..6d57f23
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/sorting/micro_external_sort/micro_external_sort.1.adm
@@ -0,0 +1,3 @@
+{ "group": "one", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] }
+{ "group": "three", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] }
+{ "group": "two", "sum_per_group": [ { "group_num": "SUM_1s", "sum": 10000 }, { "group_num": "SUM_2s", "sum": 20000 }, { "group_num": "SUM_3s", "sum": 30000 } ] }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 404a8dc..8e1f77f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -36,7 +36,7 @@
HDFS_READER,
HYBRID_HASH_JOIN,
IN_MEMORY_HASH_JOIN,
- IN_MEMORY_STABLE_SORT,
+ MICRO_STABLE_SORT,
INDEX_BULKLOAD,
INDEX_INSERT_DELETE,
INSERT_DELETE,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 0c08369..81852d4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -49,10 +49,12 @@
public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
- protected OrderColumn[] sortColumns;
- protected ILocalStructuralProperty orderProp;
+ final int maxNumberOfFrames;
+ OrderColumn[] sortColumns;
+ ILocalStructuralProperty orderProp;
- public AbstractStableSortPOperator() {
+ AbstractStableSortPOperator(int maxNumberOfFrames) {
+ this.maxNumberOfFrames = maxNumberOfFrames;
}
public OrderColumn[] getSortColumns() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
similarity index 93%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index d304421..c6a7d9d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -37,14 +37,15 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-public class InMemoryStableSortPOperator extends AbstractStableSortPOperator {
+public class MicroStableSortPOperator extends AbstractStableSortPOperator {
- public InMemoryStableSortPOperator() {
+ public MicroStableSortPOperator(int maxNumberOfFrames) {
+ super(maxNumberOfFrames);
}
@Override
public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.IN_MEMORY_STABLE_SORT;
+ return PhysicalOperatorTag.MICRO_STABLE_SORT;
}
@Override
@@ -79,7 +80,7 @@
i++;
}
- IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null);
+ IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames);
builder.contributeMicroOperator(op, runtime, recDescriptor);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index d13c51a..9567e5b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -47,16 +47,14 @@
public class StableSortPOperator extends AbstractStableSortPOperator {
- private int maxNumberOfFrames;
- private int topK;
+ private final int topK;
public StableSortPOperator(int maxNumberOfFrames) {
this(maxNumberOfFrames, -1);
}
public StableSortPOperator(int maxNumberOfFrames, int topK) {
- super();
- this.maxNumberOfFrames = maxNumberOfFrames;
+ super(maxNumberOfFrames);
this.topK = topK;
}
@@ -86,6 +84,7 @@
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
int i = 0;
+ // TODO(ali): should refactor common code with micro sort op
for (OrderColumn oc : sortColumns) {
LogicalVariable var = oc.getColumn();
sortFields[i] = opSchema.findVariable(var);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 67c1f1d..a011abf 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -66,7 +66,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
@@ -449,7 +449,7 @@
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.ORDER
|| (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
- && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
+ && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.MICRO_STABLE_SORT)
|| delivered.getLocalProperties() == null) {
return false;
}
@@ -555,7 +555,7 @@
oo.setSourceLocation(sourceLoc);
oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
if (isMicroOp) {
- oo.setPhysicalOperator(new InMemoryStableSortPOperator());
+ oo.setPhysicalOperator(new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
} else {
oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
index 5864c3b..344e103 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
@@ -89,7 +89,7 @@
}
AbstractPhysicalOperator pOrder1 = (AbstractPhysicalOperator) op2.getPhysicalOperator();
if (pOrder1.getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
- && pOrder1.getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) {
+ && pOrder1.getOperatorTag() != PhysicalOperatorTag.MICRO_STABLE_SORT) {
return false;
}
// StableSortPOperator sort1 = (StableSortPOperator) pOrder1;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 49e5a0b..f127898 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -86,7 +86,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
@@ -94,6 +93,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -271,7 +271,7 @@
if (topLevelOp) {
return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK());
} else {
- return new InMemoryStableSortPOperator();
+ return new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort());
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 2453029..b34b7b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -19,39 +19,43 @@
package org.apache.hyracks.algebricks.runtime.operators.sort;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
-import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
-import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
-import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
-
+ private final int framesLimit;
private final int[] sortFields;
private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+ IBinaryComparatorFactory[] comparatorFactories, int[] projectionList, int framesLimit) {
this(sortFields, firstKeyNormalizerFactory != null
? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } : null, comparatorFactories,
- projectionList);
+ projectionList, framesLimit);
}
public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
- IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+ IBinaryComparatorFactory[] comparatorFactories, int[] projectionList, int framesLimit) {
super(projectionList);
// Obs: the projection list is currently ignored.
if (projectionList != null) {
@@ -60,52 +64,122 @@
this.sortFields = sortFields;
this.keyNormalizerFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
+ this.framesLimit = framesLimit;
}
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- return new AbstractOneInputOneOutputPushRuntime() {
- FrameSorterMergeSort frameSorter = null;
+ InMemorySortPushRuntime pushRuntime = new InMemorySortPushRuntime(ctx);
+ ctx.registerDeallocatable(pushRuntime);
+ return pushRuntime;
+ }
- @Override
- public void open() throws HyracksDataException {
- super.open();
- if (frameSorter == null) {
- IFrameBufferManager manager = new VariableFrameMemoryManager(
- new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
- FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
- frameSorter = new FrameSorterMergeSort(ctx, manager, VariableFramePool.UNLIMITED_MEMORY, sortFields,
- keyNormalizerFactories, comparatorFactories, outputRecordDesc);
- }
- frameSorter.reset();
+ private class InMemorySortPushRuntime extends AbstractOneInputOneOutputPushRuntime implements IDeallocatable {
+ final IHyracksTaskContext ctx;
+ ExternalSortRunGenerator runsGenerator = null;
+ ExternalSortRunMerger runsMerger = null;
+ IFrameWriter wrappingWriter = null;
+
+ private InMemorySortPushRuntime(IHyracksTaskContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (runsGenerator == null) {
+ runsGenerator = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
+ comparatorFactories, outputRecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT,
+ framesLimit, Integer.MAX_VALUE);
}
+ // next writer will be opened later when preparing the merger
+ isOpen = true;
+ runsGenerator.open();
+ runsGenerator.getSorter().reset();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- frameSorter.insertFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ runsGenerator.nextFrame(buffer);
+ }
- @Override
- public void close() throws HyracksDataException {
- Throwable failure = null;
- if (isOpen) {
- try {
- if (!failed) {
- frameSorter.sort();
- frameSorter.flush(writer);
+ @Override
+ public void close() throws HyracksDataException {
+ Throwable failure = null;
+ if (isOpen) {
+ try {
+ if (!failed) {
+ runsGenerator.close();
+ createOrResetRunsMerger();
+ if (runsGenerator.getRuns().isEmpty()) {
+ wrappingWriter = runsMerger.prepareSkipMergingFinalResultWriter(writer);
+ wrappingWriter.open();
+ if (runsGenerator.getSorter().hasRemaining()) {
+ runsGenerator.getSorter().flush(wrappingWriter);
+ }
+ } else {
+ wrappingWriter = runsMerger.prepareFinalMergeResultWriter(writer);
+ wrappingWriter.open();
+ runsMerger.process(wrappingWriter);
}
+ }
+ } catch (Throwable th) {
+ failure = th;
+ fail(th);
+ } finally {
+ failure = CleanupUtils.close(wrappingWriter, failure);
+ wrappingWriter = null;
+ }
+ }
+ isOpen = false;
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ // clean up the runs if some have been generated. double close should be idempotent.
+ if (runsGenerator != null) {
+ List<GeneratedRunFileReader> runs = runsGenerator.getRuns();
+ for (int i = 0, size = runs.size(); i < size; i++) {
+ try {
+ runs.get(i).close();
} catch (Throwable th) {
- failure = th;
- fail(th);
- } finally {
- failure = CleanupUtils.close(writer, failure);
+ // ignore
}
}
- if (failure != null) {
- throw HyracksDataException.create(failure);
+ }
+ if (wrappingWriter != null) {
+ wrappingWriter.fail();
+ }
+ }
+
+ @Override
+ public void deallocate() {
+ if (runsGenerator != null) {
+ try {
+ runsGenerator.getSorter().close();
+ } catch (Exception e) {
+ // ignore
}
}
- };
+ }
+
+ private void createOrResetRunsMerger() {
+ if (runsMerger == null) {
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ INormalizedKeyComputer nmkComputer =
+ keyNormalizerFactories == null ? null : keyNormalizerFactories[0].createNormalizedKeyComputer();
+ runsMerger = new ExternalSortRunMerger(ctx, runsGenerator.getRuns(), sortFields, comparators,
+ nmkComputer, outputRecordDesc, framesLimit, Integer.MAX_VALUE);
+ } else {
+ runsMerger.reset(runsGenerator.getRuns());
+ }
+ }
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 99967c1..e9b3fc3 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -720,7 +720,7 @@
// the algebricks op.
InMemorySortRuntimeFactory sort =
new InMemorySortRuntimeFactory(new int[] { 1 }, (INormalizedKeyComputerFactory) null,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null);
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null, 50);
RecordDescriptor sortDesc = scannerDesc;
String fileName = "scanMicroSortWrite.out";
@@ -836,7 +836,7 @@
RecordDescriptor sortDesc = scannerDesc;
InMemorySortRuntimeFactory sort =
new InMemorySortRuntimeFactory(new int[] { 3 }, (INormalizedKeyComputerFactory) null,
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null, 50);
// the group-by
NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index e860288..08b2303 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+import org.apache.hyracks.util.annotations.CriticalPath;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -48,10 +49,12 @@
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
- private final int framesLimit;
+ private final int maxMergeWidth;
private final int topK;
+ private List<GeneratedRunFileReader> partialRuns;
private List<GroupVSizeFrame> inFrames;
private VSizeFrame outputFrame;
+ private boolean first;
private static final Logger LOGGER = LogManager.getLogger();
public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
@@ -69,16 +72,15 @@
this.comparators = comparators;
this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
- this.framesLimit = framesLimit;
+ this.maxMergeWidth = framesLimit - 1;
this.topK = topK;
+ this.first = true;
}
+ @CriticalPath
public void process(IFrameWriter finalWriter) throws HyracksDataException {
try {
- int maxMergeWidth = framesLimit - 1;
- inFrames = new ArrayList<>(maxMergeWidth);
- outputFrame = new VSizeFrame(ctx);
- List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth);
+ createReusableObjects();
int stop = runs.size();
currentGenerationRunAvailable.set(0, stop);
int numberOfPasses = 1;
@@ -210,6 +212,21 @@
}
}
+ public void reset(List<GeneratedRunFileReader> newRuns) {
+ this.runs.clear();
+ this.runs.addAll(newRuns);
+ this.currentGenerationRunAvailable.clear();
+ }
+
+ private void createReusableObjects() throws HyracksDataException {
+ if (first) {
+ first = false;
+ inFrames = new ArrayList<>(maxMergeWidth);
+ outputFrame = new VSizeFrame(ctx);
+ partialRuns = new ArrayList<>(maxMergeWidth);
+ }
+ }
+
public abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
throws HyracksDataException;