[NO ISSUE][COMP] Static range map in full parallel sort
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
- Support static range map in full parallel sort
- Add static range map option to ORDERED_PARTITIONED
structural property
- Add parseExpression() methods to SQL++ and AQL parsers
- Simplify RangeMapBuilder.parseHint()
Change-Id: I7eab6e6ede8c2dbb714a27801a76ad64bd9be1b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3513
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric-desc.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric-desc.aql
index 4d2807c..0061a48 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric-desc.aql
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric-desc.aql
@@ -39,6 +39,8 @@
create dataset TwitterUsers(TwitterUserType)
primary key screen-name;
+set "compiler.sort.parallel" "true"
+
for $user in dataset TwitterUsers
/*+ range [400, 150, 100] */
order by $user.friends_count desc
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric.aql
index 5916bcc..fdc9ee7 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric.aql
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-numeric.aql
@@ -39,6 +39,8 @@
create dataset TwitterUsers(TwitterUserType)
primary key screen-name;
+set "compiler.sort.parallel" "true"
+
for $user in dataset TwitterUsers
/*+ range [100, 150, 400] */
order by $user.friends_count
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string-desc.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string-desc.aql
index f9b7a34..a338b7a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string-desc.aql
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string-desc.aql
@@ -39,6 +39,8 @@
create dataset TwitterUsers(TwitterUserType)
primary key screen-name;
+set "compiler.sort.parallel" "true"
+
for $user in dataset TwitterUsers
/*+ range ["Nb", "F", "Ci"] */
order by $user.screen-name desc
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string.aql
index 852939d..734f9d3 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string.aql
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/range-connector/sort-hint-on-closed-string.aql
@@ -39,6 +39,8 @@
create dataset TwitterUsers(TwitterUserType)
primary key screen-name;
+set "compiler.sort.parallel" "true"
+
for $user in dataset TwitterUsers
/*+ range ["Ci", "F", "Nb"] */
order by $user.screen-name
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
index cdedfde..27f74d7 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric-desc.plan
@@ -1,9 +1,9 @@
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(DESC)] SPLIT COUNT:3 |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$4(DESC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- RANGE_PARTITION_EXCHANGE [$$4(DESC)] RANGE_MAP:{SPLIT:3} |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
index ba0dc6f..9dc7d67 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-numeric.plan
@@ -1,9 +1,9 @@
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- RANGE_PARTITION_MERGE_EXCHANGE [$$4(ASC)] SPLIT COUNT:3 |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$4(ASC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- RANGE_PARTITION_EXCHANGE [$$4(ASC)] RANGE_MAP:{SPLIT:3} |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
index 3faa5ec..84db03e 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string-desc.plan
@@ -1,9 +1,9 @@
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(DESC)] SPLIT COUNT:3 |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$3(DESC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- RANGE_PARTITION_EXCHANGE [$$3(DESC)] RANGE_MAP:{SPLIT:3} |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
index e0cffaa..edbcc0c 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/range-connector/sort-hint-on-closed-string.plan
@@ -1,7 +1,9 @@
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- RANGE_PARTITION_MERGE_EXCHANGE [$$3(ASC)] SPLIT COUNT:3 |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$3(ASC)] |PARTITIONED|
+ -- RANGE_PARTITION_EXCHANGE [$$3(ASC)] RANGE_MAP:{SPLIT:3} |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
index 8667a4c..54dab06 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/range-hints/order-by-exception_01/order-by-exception_01.4.query.aql
@@ -18,6 +18,8 @@
*/
use dataverse TinySocial;
+set "compiler.sort.parallel" "true"
+
for $user in dataset TwitterUsers
/*+ range ["Ci", "Nb", "F"] */
order by $user.screen-name
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.1.ddl.sqlpp
new file mode 100644
index 0000000..07c1995
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: TODO
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type TestType as
+{
+ id: int,
+ f1: int
+};
+
+create dataset TestDS(TestType) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.2.update.sqlpp
new file mode 100644
index 0000000..5aacecf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.2.update.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+INSERT INTO TestDS
+([
+{"id":1, "f1":40},
+{"id":2, "f1":40},
+{"id":3, "f1":40},
+{"id":4, "f1":41},
+{"id":5, "f1":42},
+{"id":6, "f1":42},
+{"id":7, "f1":40},
+{"id":8, "f1":41},
+{"id":9, "f1":41},
+{"id":10, "f1":42},
+{"id":11, "f1":40},
+{"id":12, "f1":41},
+{"id":13, "f1":42},
+{"id":14, "f1":41},
+{"id":15, "f1":40},
+{"id":16, "f1":42},
+{"id":17, "f1":42},
+{"id":18, "f1":41}
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.3.query.sqlpp
new file mode 100644
index 0000000..b4517e0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_static_range_map/p_sort_static_range_map.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+set `compiler.sort.parallel` "true";
+
+select value f1 from TestDS v
+/*+ range [41] */
+order by v.f1
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/p_sort_static_range_map/p_sort_static_range_map.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/p_sort_static_range_map/p_sort_static_range_map.3.adm
new file mode 100644
index 0000000..9bd5716
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/p_sort_static_range_map/p_sort_static_range_map.3.adm
@@ -0,0 +1,18 @@
+40
+40
+40
+40
+40
+40
+41
+41
+41
+41
+41
+41
+42
+42
+42
+42
+42
+42
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 5bc1650..c2c3f4c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6019,6 +6019,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="misc">
+ <compilation-unit name="p_sort_static_range_map">
+ <output-dir compare="Text">p_sort_static_range_map</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="misc">
<compilation-unit name="active_requests">
<output-dir compare="Text">active_requests</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 35c2ae8..3bb70c1 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -213,10 +213,6 @@
return s.substring(1).trim();
}
- private static IParser createNewParser(String statement) {
- return new AQLParser(statement);
- }
-
private static void checkBindingVariable(Expression returnExpression, VariableExpr var,
ILangExpression bodyExpression) throws ParseException {
if (returnExpression != null && var == null) {
@@ -286,8 +282,30 @@
}
public List<Statement> parse() throws CompilationException {
+ return parseImpl(new ParseFunction<List<Statement>>() {
+ @Override
+ public List<Statement> parse() throws ParseException {
+ return AQLParser.this.Statement();
+ }
+ });
+ }
+
+ private Expression parseExpression() throws CompilationException {
+ return parseImpl(new ParseFunction<Expression>() {
+ @Override
+ public Expression parse() throws ParseException {
+ return AQLParser.this.Expression();
+ }
+ });
+ }
+
+ private static Expression parseExpression(String text) throws CompilationException {
+ return new AQLParser(text).parseExpression();
+ }
+
+ private <T> T parseImpl(ParseFunction<T> parseFunction) throws CompilationException {
try {
- return Statement();
+ return parseFunction.parse();
} catch (Error e) {
// this is here as the JavaCharStream that's below the lexer somtimes throws Errors that are not handled
// by the ANTLR-generated lexer or parser (e.g it does this for invalid backslash u + 4 hex digits escapes)
@@ -296,6 +314,11 @@
throw new CompilationException(e.getMessage());
}
}
+
+ @FunctionalInterface
+ private interface ParseFunction<T> {
+ T parse() throws ParseException;
+ }
}
PARSER_END(AQLParser)
@@ -2421,10 +2444,9 @@
int numTuples = Integer.parseInt(splits[2]);
oc.setNumFrames(numFrames);
oc.setNumTuples(numTuples);
- }
- if (hint.startsWith(RANGE_HINT)) {
- try{
- oc.setRangeMap(RangeMapBuilder.parseHint(createNewParser(hint.substring(RANGE_HINT.length()))));
+ } else if (hint.startsWith(RANGE_HINT)) {
+ try {
+ oc.setRangeMap(RangeMapBuilder.parseHint(parseExpression(hint.substring(RANGE_HINT.length()))));
} catch (CompilationException e) {
throw new ParseException(e.getMessage());
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
index a26c94d..89f1347 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
@@ -28,9 +28,7 @@
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Expression.Kind;
-import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.Literal;
-import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.ListConstructor;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.literal.DoubleLiteral;
@@ -38,7 +36,6 @@
import org.apache.asterix.lang.common.literal.IntegerLiteral;
import org.apache.asterix.lang.common.literal.LongIntegerLiteral;
import org.apache.asterix.lang.common.literal.StringLiteral;
-import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.om.base.AMutableDouble;
import org.apache.asterix.om.base.AMutableFloat;
import org.apache.asterix.om.base.AMutableInt32;
@@ -60,34 +57,25 @@
private RangeMapBuilder() {
}
- public static RangeMap parseHint(IParser parser) throws CompilationException {
- ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- DataOutput out = abvs.getDataOutput();
- abvs.reset();
-
- List<Statement> hintStatements = parser.parse();
- if (hintStatements.size() != 1) {
- throw new CompilationException("Only one range statement is allowed for the range hint.");
- }
-
- // Translate the query into a Range Map
- if (hintStatements.get(0).getKind() != Statement.Kind.QUERY) {
- throw new CompilationException("Not a proper query for the range hint.");
- }
- Query q = (Query) hintStatements.get(0);
-
- if (q.getBody().getKind() != Kind.LIST_CONSTRUCTOR_EXPRESSION) {
+ public static RangeMap parseHint(Expression expression) throws CompilationException {
+ if (expression.getKind() != Kind.LIST_CONSTRUCTOR_EXPRESSION) {
throw new CompilationException("The range hint must be a list.");
}
- List<Expression> el = ((ListConstructor) q.getBody()).getExprList();
+
+ ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ DataOutput out = abvs.getDataOutput();
+
+ List<Expression> el = ((ListConstructor) expression).getExprList();
int[] offsets = new int[el.size()];
// Loop over list of literals
for (int i = 0; i < el.size(); ++i) {
Expression item = el.get(i);
if (item.getKind() == Kind.LITERAL_EXPRESSION) {
- parseLiteralToBytes(item, out);
+ parseLiteralToBytes((LiteralExpr) item, out);
offsets[i] = abvs.getLength();
+ } else {
+ throw new CompilationException("Expected literal in the range hint");
}
// TODO Add support for composite fields.
}
@@ -96,7 +84,7 @@
}
@SuppressWarnings("unchecked")
- private static void parseLiteralToBytes(Expression item, DataOutput out) throws CompilationException {
+ private static void parseLiteralToBytes(LiteralExpr item, DataOutput out) throws CompilationException {
AMutableDouble aDouble = new AMutableDouble(0);
AMutableFloat aFloat = new AMutableFloat(0);
AMutableInt64 aInt64 = new AMutableInt64(0);
@@ -105,7 +93,7 @@
@SuppressWarnings("rawtypes")
ISerializerDeserializer serde;
- Literal l = ((LiteralExpr) item).getValue();
+ Literal l = item.getValue();
try {
switch (l.getLiteralType()) {
case DOUBLE:
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 62c041c..ead92cf 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -276,10 +276,6 @@
return s.substring(1).trim();
}
- private static IParser createNewParser(String statement) {
- return new SQLPPParser(statement);
- }
-
private Token getHintToken(Token t) {
return t.specialToken;
}
@@ -341,9 +337,31 @@
}
public List<Statement> parse() throws CompilationException {
+ return parseImpl(new ParseFunction<List<Statement>>() {
+ @Override
+ public List<Statement> parse() throws ParseException {
+ return SQLPPParser.this.Statement();
+ }
+ });
+ }
+
+ private Expression parseExpression() throws CompilationException {
+ return parseImpl(new ParseFunction<Expression>() {
+ @Override
+ public Expression parse() throws ParseException {
+ return SQLPPParser.this.Expression();
+ }
+ });
+ }
+
+ private static Expression parseExpression(String text) throws CompilationException {
+ return new SQLPPParser(text).parseExpression();
+ }
+
+ private <T> T parseImpl(ParseFunction<T> parseFunction) throws CompilationException {
warningCollector.clear();
try {
- return Statement();
+ return parseFunction.parse();
} catch (Error e) {
// this is here as the JavaCharStream that's below the lexer sometimes throws Errors that are not handled
// by the ANTLR-generated lexer or parser (e.g it does this for invalid backslash u + 4 hex digits escapes)
@@ -356,6 +374,11 @@
}
}
+ @FunctionalInterface
+ private interface ParseFunction<T> {
+ T parse() throws ParseException;
+ }
+
@Override
public void getWarnings(Collection<? super Warning> outWarnings) {
warningCollector.getWarnings(outWarnings);
@@ -3462,7 +3485,7 @@
oc.setNumTuples(numTuples);
} else if (hint.startsWith(RANGE_HINT)) {
try {
- oc.setRangeMap(RangeMapBuilder.parseHint(createNewParser(hint.substring(RANGE_HINT.length()))));
+ oc.setRangeMap(RangeMapBuilder.parseHint(parseExpression(hint.substring(RANGE_HINT.length()))));
} catch (CompilationException e) {
throw new SqlppParseException(getSourceLocation(getHintToken(token)), e.getMessage());
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index eaec772..f941bdb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
@@ -65,30 +66,30 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- // if (orderProps == null) { // to do caching, we need some mechanism to
- // invalidate cache
- computeLocalProperties(op);
- // }
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- StructuralPropertiesVector childProp = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties();
+ OrderOperator sortOp = (OrderOperator) op;
+ computeLocalProperties(sortOp);
+ AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ StructuralPropertiesVector childProp = (StructuralPropertiesVector) childOp.getDeliveredPhysicalProperties();
deliveredProperties = new StructuralPropertiesVector(childProp.getPartitioningProperty(),
Collections.singletonList(orderProp));
}
@Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator sortOp,
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
- if (sortOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ OrderOperator sortOp = (OrderOperator) op;
if (orderProp == null) {
computeLocalProperties(sortOp);
}
StructuralPropertiesVector[] requiredProp = new StructuralPropertiesVector[1];
IPartitioningProperty partitioning;
INodeDomain targetNodeDomain = ctx.getComputationNodeDomain();
- if (isFullParallel((AbstractLogicalOperator) sortOp, targetNodeDomain, ctx)) {
+ String fullParallelAnnotation = getFullParallelAnnotation(sortOp, targetNodeDomain, ctx);
+ if (fullParallelAnnotation != null) {
// partitioning requirement: input data is re-partitioned on sort columns (global ordering)
- // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here
- partitioning = new OrderedPartitionedProperty(Arrays.asList(sortColumns), targetNodeDomain);
+ RangeMap rangeMap = getRangeMap(sortOp, fullParallelAnnotation);
+ partitioning = new OrderedPartitionedProperty(Arrays.asList(sortColumns), targetNodeDomain, rangeMap);
} else {
// partitioning requirement: input data is unpartitioned (i.e. must be merged at one site)
partitioning = IPartitioningProperty.UNPARTITIONED;
@@ -101,9 +102,8 @@
}
}
- public void computeLocalProperties(ILogicalOperator op) {
- OrderOperator ord = (OrderOperator) op;
- List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ public void computeLocalProperties(OrderOperator ord) {
+ List<OrderColumn> orderColumns = new ArrayList<>();
for (Pair<IOrder, Mutable<ILogicalExpression>> p : ord.getOrderExpressions()) {
ILogicalExpression expr = p.second.getValue();
if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
@@ -144,10 +144,11 @@
}
/**
- * When true, the sort operator requires ORDERED_PARTITION (only applicable to dynamic version for now).
+ * When a non-null value is returned, the sort operator requires ORDERED_PARTITION (applicable to both static and
+ * dynamic range versions).
* Conditions:
* 1. Execution mode == partitioned
- * 2. Dynamic range map was not disabled by some checks
+ * 2. Dynamic range map that was not disabled by some checks, or a static range map
* 3. User didn't disable it
* 4. User didn't provide static range map
* 5. Physical sort operator is not in-memory
@@ -155,15 +156,31 @@
* @param sortOp the sort operator
* @param clusterDomain the partitions specification of the cluster
* @param ctx optimization context
- * @return true if the sort operator should be full parallel sort, false otherwise.
+ * @return annotation name (non-null) if the sort operator should be full parallel sort, {@code null} otherwise.
*/
- private boolean isFullParallel(AbstractLogicalOperator sortOp, INodeDomain clusterDomain,
+ private String getFullParallelAnnotation(AbstractLogicalOperator sortOp, INodeDomain clusterDomain,
IOptimizationContext ctx) {
- return sortOp.getAnnotations().get(OperatorAnnotations.USE_DYNAMIC_RANGE) != Boolean.FALSE
- && !sortOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)
- && sortOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT
+ if (sortOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT
&& clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1
- && ctx.getPhysicalOptimizationConfig().getSortParallel();
+ && ctx.getPhysicalOptimizationConfig().getSortParallel()) {
+ if (sortOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
+ return OperatorAnnotations.USE_STATIC_RANGE;
+ } else if (sortOp.getAnnotations().get(OperatorAnnotations.USE_DYNAMIC_RANGE) != Boolean.FALSE) {
+ return OperatorAnnotations.USE_DYNAMIC_RANGE;
+ }
+ }
+ return null;
+ }
+
+ private RangeMap getRangeMap(ILogicalOperator sortOp, String fullParallelAnnotation) {
+ switch (fullParallelAnnotation) {
+ case OperatorAnnotations.USE_STATIC_RANGE:
+ return (RangeMap) sortOp.getAnnotations().get(fullParallelAnnotation);
+ case OperatorAnnotations.USE_DYNAMIC_RANGE:
+ return null;
+ default:
+ throw new IllegalStateException(fullParallelAnnotation);
+ }
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index aeb9ac7..bb0081a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -93,7 +93,7 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<>(partitioningFields), domain);
+ IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<>(partitioningFields), domain, rangeMap);
this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
}
@@ -133,7 +133,7 @@
@Override
public String toString() {
- final String splitCount = rangeMap == null ? "" : " SPLIT COUNT:" + Integer.toString(rangeMap.getSplitCount());
- return getOperatorTag().toString() + " " + partitioningFields + splitCount;
+ return getOperatorTag().toString() + " " + partitioningFields
+ + (rangeMap != null ? " RANGE_MAP:" + rangeMap : "");
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 922cdd3..15a2d8f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -84,7 +84,7 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain);
+ IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeMap);
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
@@ -145,7 +145,8 @@
@Override
public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ return getOperatorTag().toString() + " " + partitioningFields
+ + (rangeMap != null ? " RANGE_MAP:" + rangeMap : "");
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 7112ef7..0136996 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -25,15 +25,22 @@
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
public class OrderedPartitionedProperty implements IPartitioningProperty {
private final List<OrderColumn> orderColumns;
+ private final RangeMap rangeMap;
private INodeDomain domain;
public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain) {
+ this(orderColumns, domain, null);
+ }
+
+ public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeMap rangeMap) {
this.domain = domain;
this.orderColumns = orderColumns;
+ this.rangeMap = rangeMap;
}
public List<OrderColumn> getOrderColumns() {
@@ -55,7 +62,8 @@
@Override
public String toString() {
- return getPartitioningType().toString() + orderColumns + " domain:" + domain;
+ return getPartitioningType().toString() + orderColumns + " domain:" + domain
+ + (rangeMap != null ? " range-map:" + rangeMap : "");
}
@Override
@@ -63,7 +71,7 @@
List<FunctionalDependency> fds) {
List<OrderColumn> columns = PropertiesUtil.replaceOrderColumnsByEqClasses(orderColumns, equivalenceClasses);
columns = PropertiesUtil.applyFDsToOrderColumns(columns, fds);
- return new OrderedPartitionedProperty(columns, domain);
+ return new OrderedPartitionedProperty(columns, domain, rangeMap);
}
@Override
@@ -73,6 +81,10 @@
}
}
+ public RangeMap getRangeMap() {
+ return rangeMap;
+ }
+
@Override
public INodeDomain getNodeDomain() {
return domain;
@@ -97,11 +109,11 @@
}
newOrderColumns.add(new OrderColumn(newColumnVar, orderColumn.getOrder()));
}
- return applied ? new OrderedPartitionedProperty(newOrderColumns, domain) : this;
+ return applied ? new OrderedPartitionedProperty(newOrderColumns, domain, rangeMap) : this;
}
@Override
public IPartitioningProperty clonePartitioningProperty() {
- return new OrderedPartitionedProperty(new ArrayList<>(orderColumns), domain);
+ return new OrderedPartitionedProperty(new ArrayList<>(orderColumns), domain, rangeMap);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 1c00e45..2c708c5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -163,6 +163,10 @@
case ORDERED_PARTITIONED: {
OrderedPartitionedProperty or = (OrderedPartitionedProperty) reqd;
OrderedPartitionedProperty od = (OrderedPartitionedProperty) dlvd;
+ //TODO: support non-null range maps
+ if (or.getRangeMap() != null || od.getRangeMap() != null) {
+ return false;
+ }
if (mayExpandProperties) {
return isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator());
} else {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 749a63c..e174ab8 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -71,7 +71,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
@@ -450,7 +449,7 @@
return false;
}
AbstractStableSortPOperator sortOp = (AbstractStableSortPOperator) op.getPhysicalOperator();
- sortOp.computeLocalProperties(op);
+ sortOp.computeLocalProperties((OrderOperator) op);
ILocalStructuralProperty orderProp = sortOp.getOrderProperty();
return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp), delivered.getLocalProperties(),
context.getEquivalenceClassMap(op), context.getFDList(op));
@@ -581,7 +580,7 @@
IPhysicalOperator pop;
switch (pp.getPartitioningType()) {
case UNPARTITIONED: {
- pop = createMergingConnector(op, domain, deliveredByChild);
+ pop = createMergingConnector(deliveredByChild);
break;
}
case UNORDERED_PARTITIONED: {
@@ -622,28 +621,18 @@
}
}
- private IPhysicalOperator createMergingConnector(ILogicalOperator parentOp, INodeDomain domain,
- IPhysicalPropertiesVector deliveredByChild) {
- IPhysicalOperator mergingConnector;
+ private IPhysicalOperator createMergingConnector(IPhysicalPropertiesVector deliveredByChild) {
List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
if (ordCols.isEmpty()) {
IPartitioningProperty partitioningDeliveredByChild = deliveredByChild.getPartitioningProperty();
if (partitioningDeliveredByChild.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) {
- mergingConnector = new SequentialMergeExchangePOperator();
+ return new SequentialMergeExchangePOperator();
} else {
- mergingConnector = new RandomMergeExchangePOperator();
+ return new RandomMergeExchangePOperator();
}
} else {
- if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
- RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
- mergingConnector = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap);
- } else {
- OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
- sortColumns = ordCols.toArray(sortColumns);
- mergingConnector = new SortMergeExchangePOperator(sortColumns);
- }
+ return new SortMergeExchangePOperator(ordCols.toArray(new OrderColumn[ordCols.size()]));
}
- return mergingConnector;
}
private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
@@ -692,7 +681,6 @@
// options for range partitioning: 1. static range map, 2. dynamic range map computed at run time
List<OrderColumn> partitioningColumns = ((OrderedPartitionedProperty) requiredPartitioning).getOrderColumns();
if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) {
- // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here.
RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE);
return new RangePartitionExchangePOperator(partitioningColumns, domain, rangeMap);
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 3719144..aaacceb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -126,4 +126,9 @@
return fields == other.fields && Arrays.equals(endOffsets, other.endOffsets)
&& Arrays.equals(bytes, other.bytes);
}
+
+ @Override
+ public String toString() {
+ return "{SPLIT:" + getSplitCount() + '}';
+ }
}