Merge branch 'gerrit/march-hare'
Change-Id: I7889131305f3a408819c0b9100316f78801d2941
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp
new file mode 100644
index 0000000..80ada37
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-2700.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE bigfun IF EXISTS;
+CREATE DATAVERSE bigfun;
+USE bigfun;
+
+CREATE TYPE GleambookUserType AS { gb: int32, id: string };
+CREATE TYPE GleambookMessageType AS { gb: int32, message_id: string };
+
+CREATE DATASET GleambookUsersComposite(GleambookUserType) PRIMARY KEY gb,id;
+CREATE DATASET GleambookMessagesComposite(GleambookMessageType) PRIMARY KEY gb,message_id;
+
+CREATE INDEX usrSinceIx ON GleambookUsersComposite(user_since: string);
+CREATE INDEX authorIdIx ON GleambookMessagesComposite(author_id: string);
+
+SET `compiler.sort.parallel` "false";
+
+FROM (SELECT VALUE u
+ FROM GleambookUsersComposite u
+ WHERE u.user_since >= '2008-07-22T00:00:00'
+ ORDER BY u.id) AS user, GleambookMessagesComposite AS msg
+WHERE msg.author_id /*+ indexnl */ = user.id
+SELECT user.name AS uname, msg.message AS message;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan
new file mode 100644
index 0000000..3cd606b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-2700.plan
@@ -0,0 +1,35 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$56(ASC), $$57(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$42(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$42(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$54(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.1.ddl.sqlpp
new file mode 100644
index 0000000..80d8d19
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.1.ddl.sqlpp
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Create an external dataset that contains long comments fields, 10% of the records have a 32K size comments.
+* This will trigger into the VSizeFrame path
+* Expected Res : Success
+* Date : Jun 16 2015
+*/
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.LineType as
+ closed {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+create type test.OrderType as
+ closed {
+ o_orderkey : integer,
+ o_custkey : integer,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : integer,
+ o_comment : string
+};
+
+create type test.CustomerType as
+ closed {
+ c_custkey : integer,
+ c_name : string,
+ c_address : string,
+ c_nationkey : integer,
+ c_phone : string,
+ c_acctbal : double,
+ c_mktsegment : string,
+ c_comment : string
+};
+
+create external dataset Line(LineType) using `localfs`((`path`=`asterix_nc1://data/big-object/lineitem.tbl.big`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create external dataset `Order`(OrderType) using `localfs`((`path`=`asterix_nc1://data/big-object/order.tbl.verylong.big`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create external dataset Customer(CustomerType) using `localfs`((`path`=`asterix_nc1://data/big-object/customer.tbl.big`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.2.update.sqlpp
new file mode 100644
index 0000000..00a13e7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Create an external dataset that contains long comments fields, 10% of the records have a 32K size comments.
+* This will trigger into the VSizeFrame path
+* Expected Res : Success
+* Date : Jun 16 2015
+*/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.3.query.sqlpp
new file mode 100644
index 0000000..0dd36b8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_join_low_memory_err/big_object_join.3.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Create an external dataset that contains long comments fields, 10% of the records have a 32K size comments.
+ This will trigger into the VSizeFrame path
+* Expected Res : Success
+* Date : Jun 16 2015
+*/
+SET `compiler.joinmemory` "160KB";
+use test;
+
+
+select element {'c_custkey':c.c_custkey,'o_orderkey':o.o_orderkey,'len_c_comment':test.`string-length`(c.c_comment),'len_o_comment':test.`string-length`(o.o_comment),'c_comment':c.c_comment}
+from Customer as c,
+ `Order` as o
+where (c.c_custkey = o.o_custkey)
+order by o.o_orderkey, c.c_custkey
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp
new file mode 100644
index 0000000..4572164
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-2700
+ */
+
+DROP DATAVERSE bigfun IF EXISTS;
+CREATE DATAVERSE bigfun;
+USE bigfun;
+
+CREATE TYPE GleambookUserType AS { gb: int32, id: string };
+CREATE TYPE GleambookMessageType AS { gb: int32, message_id: string };
+
+CREATE DATASET GleambookUsersComposite(GleambookUserType) PRIMARY KEY gb,id;
+CREATE DATASET GleambookMessagesComposite(GleambookMessageType) PRIMARY KEY gb,message_id;
+
+CREATE INDEX usrSinceIx ON GleambookUsersComposite(user_since: string);
+CREATE INDEX authorIdIx ON GleambookMessagesComposite(author_id: string);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp
new file mode 100644
index 0000000..43bd629
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.2.update.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE bigfun;
+
+INSERT INTO GleambookUsersComposite [
+{"gb": 1, "id": "1", "name": "name1", "user_since": '2010-07-22T00:00:00'},
+{"gb": 1, "id": "2", "name": "name2", "user_since": '2011-07-22T00:00:00'},
+{"gb": 2, "id": "3", "name": "name3", "user_since": '2010-09-22T00:00:00'},
+{"gb": 2, "id": "4", "name": "name4", "user_since": '2010-10-22T00:00:00'},
+{"gb": 3, "id": "5", "name": "name5", "user_since": '2013-07-22T00:00:00'}
+];
+
+INSERT INTO GleambookMessagesComposite [
+{"gb": 1, "message_id": "1", "author_id": "1", "message": "message1_1"},
+{"gb": 1, "message_id": "2", "author_id": "1", "message": "message2_1"},
+{"gb": 1, "message_id": "3", "author_id": "1", "message": "message3_1"},
+{"gb": 2, "message_id": "4", "author_id": "2", "message": "message1_2"},
+{"gb": 2, "message_id": "5", "author_id": "2", "message": "message2_2"},
+{"gb": 3, "message_id": "6", "author_id": "2", "message": "message3_2"},
+{"gb": 3, "message_id": "7", "author_id": "5", "message": "message1_5"},
+{"gb": 3, "message_id": "8", "author_id": "5", "message": "message2_5"}
+];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp
new file mode 100644
index 0000000..9d6dc06
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2700/query-ASTERIXSB-2700.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE bigfun;
+
+SET `compiler.sort.parallel` "false";
+
+FROM (SELECT VALUE u
+ FROM GleambookUsersComposite u
+ WHERE u.user_since >= '2008-07-22T00:00:00'
+ ORDER BY u.id) AS user, GleambookMessagesComposite AS msg
+WHERE msg.author_id /*+ indexnl */ = user.id
+SELECT user.name AS uname, msg.message AS message
+ORDER BY uname, message;;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm
new file mode 100644
index 0000000..56bcafa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2700/query-ASTERIXDB-2700.3.adm
@@ -0,0 +1,8 @@
+{ "uname": "name1", "message": "message1_1" }
+{ "uname": "name1", "message": "message2_1" }
+{ "uname": "name1", "message": "message3_1" }
+{ "uname": "name2", "message": "message1_2" }
+{ "uname": "name2", "message": "message2_2" }
+{ "uname": "name2", "message": "message3_2" }
+{ "uname": "name5", "message": "message1_5" }
+{ "uname": "name5", "message": "message2_5" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index aa5fc5d..484a7da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6458,6 +6458,11 @@
<output-dir compare="Text">insert_nulls_with_secondary_idx</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="query-ASTERIXDB-2700">
+ <output-dir compare="Text">query-ASTERIXDB-2700</output-dir>
+ </compilation-unit>
+ </test-case>
<!--
<test-case FilePath="misc">
<compilation-unit name="query-ASTERIXDB-1203">
@@ -12367,6 +12372,13 @@
</compilation-unit>
</test-case>
<test-case FilePath="big-object">
+ <compilation-unit name="big_object_join_low_memory_err">
+ <output-dir compare="Text">big_object_join</output-dir>
+ <expected-error>HYR0123: Insufficient memory is provided for the join operators, please increase the join memory budget.</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="big-object">
<compilation-unit name="big_object_load_20M">
<output-dir compare="Text">big_object_load_20M</output-dir>
</compilation-unit>
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 9236545..706028b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
@@ -63,6 +64,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
@@ -284,18 +286,18 @@
printOp(op, context);
}
changed = true;
- AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
- nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue();
- }
- opRef.setValue(nextOp);
- // Now, transfer annotations from the original sort op. to this one.
- AbstractLogicalOperator transferTo = nextOp;
- if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
- // remove duplicate exchange operator
- transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
- }
- transferTo.getAnnotations().putAll(op.getAnnotations());
+ // replace the sort with empty assign (to handle cases where the sort might be sitting between exchanges)
+ // RemoveUnusedAssignAndAggregateRule should run after and decide whether to remove the assign or keep it
+ AssignOperator assignOperator = new AssignOperator(new ArrayList<>(0), new ArrayList<>(0));
+ AssignPOperator assignPOperator = new AssignPOperator();
+ assignOperator.setSourceLocation(opRef.getValue().getSourceLocation());
+ assignOperator.setPhysicalOperator(assignPOperator);
+ assignOperator.getInputs().addAll(op.getInputs());
+ opRef.setValue(assignOperator);
+ OperatorManipulationUtil.setOperatorMode(assignOperator);
+ OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(assignOperator, context);
+ context.computeAndSetTypeEnvironmentForOperator(assignOperator);
+ PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(assignOperator, context);
physOptimizeOp(opRef, required, nestedPlan, context);
}
return changed;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index eba3c91..a563f46 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -90,7 +90,7 @@
// we try to remove these operators if the produced variables from these
// operators are not used.
if (!assignedVarMap.isEmpty()) {
- removeUnusedAssigns(opRef, context);
+ removeUnusedAssigns(opRef, false, null, context);
}
return isTransformed;
@@ -139,8 +139,19 @@
return assignVarsSetForThisOp;
}
- private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
+ /**
+ * Removes the assigned variables of an operator (left-hand side variables) if they are not used. It also removes
+ * the operator altogether when the operator is not assigning any more variables after removal of the variables
+ * (Except for few specific operators which cannot be removed such as UNIONALL).
+ *
+ * @param opRef the operator from which the assigned variables are to be removed.
+ * @param opInSubplan whether the operator is inside a subplan.
+ * @param parentOp the parent operator of {@code opRef} or null if it does not have one.
+ * @param context the optimization context.
+ * @throws AlgebricksException
+ */
+ private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, boolean opInSubplan, ILogicalOperator parentOp,
+ IOptimizationContext context) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
@@ -148,8 +159,7 @@
while (removeFromAssigns(op, assignVarsSetForThisOp, context) == 0) {
// UnionAllOperator cannot be removed since it has two branches.
- if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE
- || op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+ if (!canRemoveOperator(op, opInSubplan, parentOp)) {
break;
}
op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
@@ -161,7 +171,7 @@
Iterator<Mutable<ILogicalOperator>> childIter = op.getInputs().iterator();
while (childIter.hasNext()) {
Mutable<ILogicalOperator> cRef = childIter.next();
- removeUnusedAssigns(cRef, context);
+ removeUnusedAssigns(cRef, opInSubplan, op, context);
}
if (op.hasNestedPlans()) {
@@ -170,7 +180,7 @@
while (planIter.hasNext()) {
ILogicalPlan p = planIter.next();
for (Mutable<ILogicalOperator> r : p.getRoots()) {
- removeUnusedAssigns(r, context);
+ removeUnusedAssigns(r, true, null, context);
}
}
@@ -420,6 +430,22 @@
}
}
+ private static boolean canRemoveOperator(ILogicalOperator op, boolean opInsideSubplan, ILogicalOperator parentOp) {
+ LogicalOperatorTag opTag = op.getOperatorTag();
+ if (opTag == LogicalOperatorTag.AGGREGATE || opTag == LogicalOperatorTag.UNIONALL) {
+ return false;
+ }
+ if (!opInsideSubplan) {
+ // for an operator in the outer plan, do not remove if it's sitting between exchanges or it's root+exchange
+ boolean childIsExchange =
+ op.hasInputs() && op.getInputs().get(0).getValue().getOperatorTag() == LogicalOperatorTag.EXCHANGE;
+ if (childIsExchange && (parentOp == null || parentOp.getOperatorTag() == LogicalOperatorTag.EXCHANGE)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private void clear() {
assignedVarMap.clear();
assignedVarSet.clear();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 5716262..38f49dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -156,6 +156,7 @@
public static final int EOF = 120;
public static final int NUMERIC_PROMOTION_ERROR = 121;
public static final int ERROR_PRINTING_PLAN = 122;
+ public static final int INSUFFICIENT_MEMORY = 123;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index fcbb6bb..cfb6c9e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -139,6 +139,7 @@
120 = End of file
121 = A numeric type promotion error has occurred: %1$s
122 = Encountered an error while printing the plan
+123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index dba21d6..3d3bc7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -150,8 +151,7 @@
while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
int victimPartition = spillPolicy.selectVictimPartition(pid);
if (victimPartition < 0) {
- throw new HyracksDataException(
- "No more space left in the memory buffer, please assign more memory to hash-join.");
+ throw HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY);
}
spillPartition(victimPartition);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 2a7b47e..4cae09f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -19,9 +19,7 @@
package org.apache.hyracks.http.server;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -71,7 +69,7 @@
private final AtomicInteger threadId = new AtomicInteger();
private final ConcurrentMap<String, Object> ctx;
private final LinkedBlockingQueue<Runnable> workQueue;
- private final List<IServlet> servlets;
+ private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final InetSocketAddress address;
@@ -100,7 +98,7 @@
this.closedHandler = closeHandler;
this.config = config;
ctx = new ConcurrentHashMap<>();
- servlets = new ArrayList<>();
+ servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
@@ -219,21 +217,14 @@
}
public void addServlet(IServlet let) {
- servlets.add(let);
+ servlets.register(let);
+ }
+
+ public Set<IServlet> getServlets() {
+ return servlets.getServlets();
}
protected void doStart() throws InterruptedException {
- /*
- * This is a hacky way to ensure that IServlets with more specific paths are checked first.
- * For example:
- * "/path/to/resource/"
- * is checked before
- * "/path/to/"
- * which in turn is checked before
- * "/path/"
- * Note that it doesn't work for the case where multiple paths map to a single IServlet
- */
- Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
channel = bind();
}
@@ -341,44 +332,7 @@
}
public IServlet getServlet(FullHttpRequest request) {
- String uri = request.uri();
- int i = uri.indexOf('?');
- if (i >= 0) {
- uri = uri.substring(0, i);
- }
- for (IServlet servlet : servlets) {
- for (String path : servlet.getPaths()) {
- if (match(path, uri)) {
- return servlet;
- }
- }
- }
- return null;
- }
-
- static boolean match(String pathSpec, String path) {
- char c = pathSpec.charAt(0);
- if (c == '/') {
- if (pathSpec.equals(path) || (pathSpec.length() == 1 && path.isEmpty())) {
- return true;
- }
- if (isPathWildcardMatch(pathSpec, path)) {
- return true;
- }
- } else if (c == '*') {
- return path.regionMatches(path.length() - pathSpec.length() + 1, pathSpec, 1, pathSpec.length() - 1);
- }
- return false;
- }
-
- static boolean isPathWildcardMatch(String pathSpec, String path) {
- final int length = pathSpec.length();
- if (length < 2) {
- return false;
- }
- final int cpl = length - 2;
- final boolean b = pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0, cpl);
- return b && (path.length() == cpl || '/' == path.charAt(cpl));
+ return servlets.getServlet(request.uri());
}
protected HttpServerHandler<? extends HttpServer> createHttpHandler(int chunkSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 4882572..0f4ce8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Consumer;
import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServlet;
@@ -43,6 +44,7 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
@@ -119,11 +121,19 @@
}
protected void respond(ChannelHandlerContext ctx, HttpRequest request, HttpResponseStatus status) {
+ respond(ctx, request, status, null);
+ }
+
+ protected void respond(ChannelHandlerContext ctx, HttpRequest request, HttpResponseStatus status,
+ Consumer<HttpResponse> beforeWrite) {
final DefaultHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status);
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
HttpUtil.setConnectionHeader(request, response);
final ChannelPromise responseCompletionPromise = ctx.newPromise();
responseCompletionPromise.addListener(this);
+ if (beforeWrite != null) {
+ beforeWrite.accept(response);
+ }
final ChannelFuture clientChannel = ctx.writeAndFlush(response, responseCompletionPromise);
if (!io.netty.handler.codec.http.HttpUtil.isKeepAlive(request)) {
clientChannel.addListener(ChannelFutureListener.CLOSE);
@@ -160,7 +170,8 @@
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("No servlet for " + request.uri());
}
- respond(ctx, request, HttpResponseStatus.NOT_FOUND);
+ respond(ctx, request, HttpResponseStatus.NOT_FOUND,
+ response -> response.headers().set(HttpUtil.PERMANENT, "true"));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ServletRegistry.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ServletRegistry.java
new file mode 100644
index 0000000..083c0ff
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ServletRegistry.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe // thread safe for concurrent reads, but concurrent registrations are not supported
+public class ServletRegistry {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final SortedMap<String, IServlet> servletMap = new TreeMap<>(Comparator.reverseOrder());
+ private final Set<IServlet> servlets = new HashSet<>();
+
+ public void register(IServlet let) {
+ servlets.add(let);
+ for (String path : let.getPaths()) {
+ LOGGER.debug("registering servlet {}[{}] with path {}", let, let.getClass().getName(), path);
+ IServlet prev = servletMap.put(path, let);
+ if (prev != null) {
+ throw new IllegalStateException("duplicate servlet mapping! (path = " + path + ", orig = " + prev + "["
+ + prev.getClass().getName() + "], dup = " + let + "[" + let.getClass().getName() + "])");
+ }
+ }
+ }
+
+ public Set<IServlet> getServlets() {
+ return Collections.unmodifiableSet(servlets);
+ }
+
+ public IServlet getServlet(String uri) {
+ String baseUri = HttpUtil.trimQuery(uri);
+ return servletMap.entrySet().stream().filter(entry -> match(entry.getKey(), baseUri)).map(Map.Entry::getValue)
+ .findFirst().orElse(null);
+ }
+
+ static boolean match(String pathSpec, String path) {
+ char c = pathSpec.charAt(0);
+ if (c == '/') {
+ if (pathSpec.equals(path) || (pathSpec.length() == 1 && path.isEmpty())) {
+ return true;
+ }
+ return isPathWildcardMatch(pathSpec, path);
+ } else if (c == '*') {
+ return path.regionMatches(path.length() - pathSpec.length() + 1, pathSpec, 1, pathSpec.length() - 1);
+ }
+ return false;
+ }
+
+ static boolean isPathWildcardMatch(String pathSpec, String path) {
+ final int length = pathSpec.length();
+ if (length < 2) {
+ return false;
+ }
+ final int cpl = length - 2;
+ final boolean b = pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0, cpl);
+ return b && (path.length() == cpl || '/' == path.charAt(cpl));
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 8403dce..46d429a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -53,33 +53,11 @@
private static final Charset DEFAULT_RESPONSE_CHARSET = StandardCharsets.UTF_8;
public static final AsciiString X_FORWARDED_PROTO = AsciiString.cached("x-forwarded-proto");
+ public static final AsciiString PERMANENT = AsciiString.cached("permanent");
private HttpUtil() {
}
- public static class Encoding {
- public static final String UTF8 = "utf-8";
-
- private Encoding() {
- }
- }
-
- public static class ContentType {
- public static final String ADM = "adm";
- public static final String JSON = "json";
- public static final String CSV = "csv";
- public static final String APPLICATION_ADM = "application/x-adm";
- public static final String APPLICATION_JSON = "application/json";
- public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
- public static final String TEXT_CSV = "text/csv";
- public static final String IMG_PNG = "image/png";
- public static final String TEXT_HTML = "text/html";
- public static final String TEXT_PLAIN = "text/plain";
-
- private ContentType() {
- }
- }
-
public static String getParameter(Map<String, List<String>> parameters, CharSequence name) {
List<String> parameter = parameters.get(String.valueOf(name));
return parameter == null ? null : String.join(",", parameter);
@@ -201,6 +179,34 @@
return preferredCharset.orElse(defaultCharset);
}
+ public static String trimQuery(String uri) {
+ int i = uri.indexOf('?');
+ return i < 0 ? uri : uri.substring(0, i);
+ }
+
+ public static class Encoding {
+ public static final String UTF8 = "utf-8";
+
+ private Encoding() {
+ }
+ }
+
+ public static class ContentType {
+ public static final String ADM = "adm";
+ public static final String JSON = "json";
+ public static final String CSV = "csv";
+ public static final String APPLICATION_ADM = "application/x-adm";
+ public static final String APPLICATION_JSON = "application/json";
+ public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
+ public static final String TEXT_CSV = "text/csv";
+ public static final String IMG_PNG = "image/png";
+ public static final String TEXT_HTML = "text/html";
+ public static final String TEXT_PLAIN = "text/plain";
+
+ private ContentType() {
+ }
+ }
+
private static class WeightedHeaderValue implements Comparable<WeightedHeaderValue> {
final String value;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/server/PathMatchTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/server/PathMatchTest.java
index 89260c5..9f9e174 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/server/PathMatchTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/server/PathMatchTest.java
@@ -24,18 +24,18 @@
public class PathMatchTest {
@Test
public void test() {
- Assert.assertTrue(HttpServer.match("/", ""));
+ Assert.assertTrue(ServletRegistry.match("/", ""));
- Assert.assertTrue(HttpServer.match("/", "/"));
- Assert.assertFalse(HttpServer.match("/", "/a"));
+ Assert.assertTrue(ServletRegistry.match("/", "/"));
+ Assert.assertFalse(ServletRegistry.match("/", "/a"));
- Assert.assertFalse(HttpServer.match("/a", "/"));
- Assert.assertTrue(HttpServer.match("/a", "/a"));
+ Assert.assertFalse(ServletRegistry.match("/a", "/"));
+ Assert.assertTrue(ServletRegistry.match("/a", "/a"));
- Assert.assertTrue(HttpServer.match("/*", "/"));
- Assert.assertTrue(HttpServer.match("/*", "/a"));
+ Assert.assertTrue(ServletRegistry.match("/*", "/"));
+ Assert.assertTrue(ServletRegistry.match("/*", "/a"));
- Assert.assertFalse(HttpServer.match("/a/*", "/"));
- Assert.assertTrue(HttpServer.match("/a/*", "/a"));
+ Assert.assertFalse(ServletRegistry.match("/a/*", "/"));
+ Assert.assertTrue(ServletRegistry.match("/a/*", "/a"));
}
}