ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators.
This change includes the following parts:
- Fix the implementation of fail() and close() in several runtime operators
to avoid file handle leak and job hang;
- Add an erase method to RunFileWriter which closes files before deleting
them in order release the holding disk space;
- Call RunFileWriter.close() and RunFileReader.close() in "finally" blocks.
- Fix RunFileReader to not truncate files to be deleted - it is not the root
cause of un-released disk space - open deleted files are the root cuase;
- Check file handle leaks in LangExecutionUtil.tearDown().
Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1513
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index a3d6102..d04217c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -83,47 +83,53 @@
TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
- partitioner.open();
- FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
List<TestFrameWriter> recipients = new ArrayList<>();
- for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
- recipients.add((TestFrameWriter) writer);
- }
- partitioner.flush();
- for (TestFrameWriter writer : recipients) {
- Assert.assertEquals(writer.nextFrameCount(), 1);
- fta.reset(writer.getLastFrame());
- Assert.assertEquals(fta.getTupleCount(), 1);
- FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
- Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
- MessagingFrameTupleAppender.getMessageType(tempBuffer));
- }
- message.getBuffer().clear();
- message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
- message.getBuffer().flip();
- partitioner.flush();
- for (TestFrameWriter writer : recipients) {
- Assert.assertEquals(writer.nextFrameCount(), 2);
- fta.reset(writer.getLastFrame());
- Assert.assertEquals(fta.getTupleCount(), 1);
- FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
- Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
- MessagingFrameTupleAppender.getMessageType(tempBuffer));
- }
+ try {
+ partitioner.open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+ for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+ recipients.add((TestFrameWriter) writer);
+ }
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 1);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 2);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
- message.getBuffer().clear();
- message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
- message.getBuffer().flip();
- partitioner.flush();
- for (TestFrameWriter writer : recipients) {
- Assert.assertEquals(writer.nextFrameCount(), 3);
- fta.reset(writer.getLastFrame());
- Assert.assertEquals(fta.getTupleCount(), 1);
- FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
- Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
- MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 3);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ } catch (Throwable t) {
+ partitioner.fail();
+ throw t;
+ } finally {
+ partitioner.close();
}
- partitioner.close();
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 4);
Assert.assertEquals(writer.closeCount(), 1);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 4db00c8..0e6be0f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,12 @@
package org.apache.asterix.test.runtime;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -28,7 +33,9 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -63,6 +70,9 @@
}
public static void tearDown() throws Exception {
+ // Check whether there are leaked open run file handles.
+ checkRunFileLeaks();
+
TestLibrarian.removeLibraryDir();
ExecutionTestUtil.tearDown(cleanupOnStop);
ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
@@ -117,4 +127,22 @@
System.err.flush();
}
}
+
+ private static void checkRunFileLeaks() throws IOException {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ return;
+ }
+ // Only run the check on Linux and MacOS.
+ RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ String processName = runtimeMXBean.getName();
+ String processId = processName.split("@")[0];
+
+ // Checks whether there are leaked run files from operators.
+ Process process = Runtime.getRuntime()
+ .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf|wc -l" });
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+ int runFileCount = Integer.parseInt(reader.readLine().trim());
+ Assert.assertTrue(runFileCount == 0);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
index 1757130..9b2e66d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
@@ -18,11 +18,11 @@
*/
use dataverse tpch;
-
+
for $l in dataset('LineItem')
//where inject-failure($l.l_shipdate <= '1998-09-02', $l.l_orderkey=5999)
/*+ hash*/
-group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus
+group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus
with $l
order by $l_returnflag, $l_linestatus
return {
@@ -32,8 +32,8 @@
"sum_base_price": sum(for $i in $l return $i.l_extendedprice),
"sum_disc_price": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount)),
"sum_charge": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)),
- "ave_qty": avg(for $i in $l return $i.l_quantity),
+ "ave_qty": avg(for $i in $l return $i.l_quantity),
"ave_price": avg(for $i in $l return $i.l_extendedprice),
"ave_disc": avg(for $i in $l return $i.l_discount),
"count_order": count($l)
-}
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..a3b7589
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM LineItem l
+GROUP BY l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
new file mode 100644
index 0000000..c11a0ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM LineItem l
+/*+ hash */
+GROUP BY l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..0314a6e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=1024), l_quantity
+FROM LineItem l
+ORDER BY l_shipdate, l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
index f91df13..5ebec80 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
@@ -20,10 +20,10 @@
drop dataverse tpch if exists;
create dataverse tpch;
-use test;
+use tpch;
-create type test.LineItemType as
+create type LineItemType as
closed {
l_orderkey : integer,
l_partkey : integer,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
index 5fe734c..0340837 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
@@ -20,5 +20,6 @@
use tpch;
-load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
index cac4a08..e96644b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
@@ -17,33 +17,22 @@
* under the License.
*/
-use tpch;
+USE tpch;
+SET `import-private-functions` "true";
-select element {'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'sum_qty':tpch.coll_sum((
- select element i.l_quantity
- from l as i
- )),'sum_base_price':tpch.coll_sum((
- select element i.l_extendedprice
- from l as i
- )),'sum_disc_price':tpch.coll_sum((
- select element (i.l_extendedprice * (1 - i.l_discount))
- from l as i
- )),'sum_charge':tpch.coll_sum((
- select element (i.l_extendedprice * (1 - i.l_discount) * (1 + i.l_tax))
- from l as i
- )),'ave_qty':tpch.coll_avg((
- select element i.l_quantity
- from l as i
- )),'ave_price':tpch.coll_avg((
- select element i.l_extendedprice
- from l as i
- )),'ave_disc':tpch.coll_avg((
- select element i.l_discount
- from l as i
- )),'count_order':tpch.coll_count(l)}
-from LineItem as l
-/* +hash */
-group by l.l_returnflag as l_returnflag,l.l_linestatus as l_linestatus
-order by l_returnflag,l_linestatus
+SELECT l_returnflag,
+ l_linestatus,
+ sum(l_quantity) AS sum_qty,
+ sum(l_extendedprice) AS sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
+ avg(l_quantity) AS ave_qty,
+ avg(l_extendedprice) AS ave_price,
+ avg(l_discount) AS ave_disc,
+ count(1) AS count_order
+FROM LineItem l
+WHERE l.l_shipdate <= '1998-09-02' AND inject_failure(true, l.l_orderkey=5988)
+GROUP BY l_returnflag, l_linestatus
+ORDER BY l_returnflag, l_linestatus
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..9e64c15
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : bigint,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create type OrderType as {
+ o_orderkey : bigint,
+ o_custkey : bigint,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : bigint,
+ o_comment : string
+}
+
+create type CustomerType as {
+ c_custkey : bigint,
+ c_name : string,
+ c_address : string,
+ c_nationkey : bigint,
+ c_phone : string,
+ c_acctbal : double,
+ c_mktsegment : string,
+ c_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create dataset Orders(OrderType) primary key o_orderkey;
+
+create dataset Customer(CustomerType) primary key c_custkey;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
new file mode 100644
index 0000000..83ec6c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load dataset Customer using localfs ((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
new file mode 100644
index 0000000..35acf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+WITH tmp AS
+(
+ SELECT l_orderkey, SUM(l_quantity) t_sum_quantity
+ FROM LineItem
+ GROUP BY l_orderkey
+)
+
+SELECT c.c_name, c.c_custkey, inject_failure(o.o_orderkey, o.o_orderkey=5988),
+ o.o_orderdate, o.o_totalprice, l.l_quantity
+FROM LineItem l
+JOIN tmp t ON t.l_orderkey = l.l_orderkey
+JOIN Orders o ON o.o_orderkey = t.l_orderkey
+JOIN Customer c ON c.c_custkey = o.o_custkey
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0d8da65..431b215 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2028,13 +2028,36 @@
</test-case>
</test-group>
<test-group name="failure">
- <!--
- <test-case FilePath="failure">
- <compilation-unit name="q1_pricing_summary_report_failure">
- <output-dir compare="Text">q1_pricing_summary_report_failure</output-dir>
- </compilation-unit>
- </test-case>
- -->
+ <test-case FilePath="failure">
+ <compilation-unit name="group_by_failure">
+ <output-dir compare="Text">group_by_failure</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failure">
+ <compilation-unit name="group_by_hash_failure">
+ <output-dir compare="Text">group_by_hash_failure</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failure">
+ <compilation-unit name="q01_pricing_summary_report_failure">
+ <output-dir compare="Text">q01_pricing_summary_report_failure</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failure">
+ <compilation-unit name="q18_large_volume_customer_failure">
+ <output-dir compare="Text">q18_large_volume_customer_failure</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failure">
+ <compilation-unit name="order_by_failure">
+ <output-dir compare="Text">order_by_failure</output-dir>
+ <expected-error>Injected failure in asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<!--
<test-group name="flwor">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index baf5dba..dd713e6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -155,6 +155,9 @@
public void close() throws HyracksDataException {
try {
appender.write(writer, true);
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
} finally {
writer.close();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index cd04515..9982477 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -129,8 +129,9 @@
}
} catch (Throwable th) {
LOGGER.log(Level.WARNING, th.getMessage(), th);
+ } finally {
+ writer.close();
}
- writer.close();
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 99fff19..fe2d4ec 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,15 +74,17 @@
throw new RuntimeDataException(
ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
}
- } catch (Throwable ie) {
+ } catch (Exception e) {
/*
- * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
- * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
- * The surviving intake partitions must continue to live and receive data from the external source.
+ * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another
+ * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is
+ * indicative of a failure at the sibling intake operator location. The surviving intake partitions must
+ * continue to live and receive data from the external source.
*/
- throw new HyracksDataException(ie);
+ writer.fail();
+ throw e;
} finally {
- writer.close();
+ writer.close();
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index b2ffd6e..6869523 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -301,10 +301,11 @@
@Override
public void close() throws HyracksDataException {
try {
- cursor.close();
- writer.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ try {
+ cursor.close();
+ } finally {
+ writer.close();
+ }
} finally {
indexHelper.close();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
index c3e2681..377ad60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
@@ -39,8 +39,11 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
- writer.close();
+ try {
+ writer.open();
+ } finally {
+ writer.close();
+ }
}
};
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index d38c5b7..33078ff 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -170,8 +170,14 @@
if (isSink) {
return;
}
- flushIfNotFailed();
- writer.close();
+ try {
+ flushIfNotFailed();
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
+ } finally {
+ writer.close();
+ }
appender.reset(frame, true);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index e94f4b7..2d8eaed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -53,6 +53,9 @@
public void close() throws HyracksDataException {
try {
flushIfNotFailed();
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
} finally {
writer.close();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1123c5e..3ac8c40 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -21,8 +21,6 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,6 +34,8 @@
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 60d7eec..aefc99d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -179,7 +179,7 @@
@Override
public void fail() throws HyracksDataException {
if (isOpen) {
- writer.fail();
+ super.fail();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8d312d5..b5bd1a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -53,7 +53,7 @@
public static final int RESULT_FAILURE_EXCEPTION = 16;
public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
public static final int INCONSISTENT_RESULT_METADATA = 18;
- public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19;
+ public static final int CANNOT_DELETE_FILE = 19;
public static final int NOT_A_JOBID = 20;
public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 7f90c35..f536d3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -37,7 +37,7 @@
16 = Failure producing result set %1$s for job %2$s
17 = No exception for failed result set %1$s for job %2$s
18 = Inconsistent metadata for result set %1$s"
-19 = Can't truncate or delete the file: %1$s
+19 = Cannot delete the file: %1$s
20 = '%1$s' is not a valid job id.
21 = The distributed job %1$s was not found
22 = The distributed job %1$s already exists
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index f7aa2e8..952eb75 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -55,6 +55,8 @@
private boolean partitionRegistered;
+ private boolean failed = false;
+
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
@@ -97,6 +99,7 @@
@Override
public void fail() throws HyracksDataException {
try {
+ failed = true;
resultState.closeAndDelete();
resultState.abort();
registerResultPartitionLocation(false);
@@ -111,8 +114,13 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + partition + ")");
}
- registerResultPartitionLocation(true);
- resultState.close();
+ try {
+ if (!failed) {
+ registerResultPartitionLocation(true);
+ }
+ } finally {
+ resultState.close();
+ }
try {
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
} catch (HyracksException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index be7ed3d..c501b5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -86,6 +86,7 @@
public synchronized void close() {
eos.set(true);
+ closeWriteFileHandle();
notifyAll();
}
@@ -93,6 +94,13 @@
// Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
// to be taken when there are more requests to these result states.
failed.set(true);
+ closeWriteFileHandle();
+ if (fileRef != null) {
+ fileRef.delete();
+ }
+ }
+
+ private void closeWriteFileHandle() {
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
@@ -100,9 +108,6 @@
// Since file handle could not be closed, just ignore.
}
}
- if (fileRef != null) {
- fileRef.delete();
- }
}
public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
@@ -114,7 +119,6 @@
}
size += ioManager.syncWrite(writeFileHandle, size, buffer);
-
notifyAll();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f0e7f0e..33b8980 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -72,7 +72,6 @@
}
public void close() throws IOException {
- channel.close();
raf.close();
}
@@ -80,10 +79,6 @@
return fileRef;
}
- public RandomAccessFile getRandomAccessFile() {
- return raf;
- }
-
public FileChannel getFileChannel() {
return channel;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index e5e81ab..e51d2bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -80,10 +80,13 @@
@Override
public void close() throws HyracksDataException {
closeTime = System.currentTimeMillis();
- ((Task) ctx)
- .setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(),
- cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
- writer.close();
+ try {
+ ((Task) ctx).setPartitionSendProfile(
+ new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(),
+ senderIndex, receiverIndex), openTime, closeTime, mrep));
+ } finally {
+ writer.close();
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 365b01c..d9a4c7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -62,8 +62,14 @@
if (!open) {
throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
}
- tupleAppender.write(frameWriter, true);
- frameWriter.close();
+ try {
+ tupleAppender.write(frameWriter, true);
+ } catch (Exception e) {
+ frameWriter.fail();
+ throw e;
+ } finally {
+ frameWriter.close();
+ }
open = false;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b69f377..f0bd318 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
public class RunFileReader implements IFrameReader {
private final FileReference file;
@@ -49,7 +48,7 @@
@Override
public void open() throws HyracksDataException {
// Opens RW mode because we need to truncate the given file if required.
- handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
+ handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
readPtr = 0;
}
@@ -87,12 +86,10 @@
public void close() throws HyracksDataException {
if (deleteAfterClose) {
try {
- // Truncates the file size to zero since OS might be keeping the file for a while.
- ((FileHandle) handle).getFileChannel().truncate(0);
ioManager.close(handle);
FileUtils.deleteQuietly(file.getFile());
} catch (IOException e) {
- throw HyracksDataException.create(ErrorCode.CANNOT_TRUNCATE_OR_DELETE_FILE, e, file.toString());
+ throw HyracksDataException.create(ErrorCode.CANNOT_DELETE_FILE, e, file.toString());
}
} else {
ioManager.close(handle);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 8031422..915c63d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -27,8 +27,8 @@
import org.apache.hyracks.api.io.IIOManager;
public class RunFileWriter implements IFrameWriter {
- private final FileReference file;
private final IIOManager ioManager;
+ private FileReference file;
private boolean failed;
private IFileHandle handle;
@@ -69,6 +69,15 @@
}
}
+ public void erase() throws HyracksDataException {
+ close();
+ file.delete();
+
+ // Make sure we never access the file if it is deleted.
+ file = null;
+ handle = null;
+ }
+
public FileReference getFileReference() {
return file;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 7d10802..c049b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -107,10 +107,9 @@
@Override
public void close() throws HyracksDataException {
if (isFailed && state.getRuns() != null) {
- for (int i = 0; i < state.getRuns().length; i++) {
- RunFileWriter run = state.getRuns()[i];
+ for (RunFileWriter run : state.getRuns()) {
if (run != null) {
- run.getFileReference().delete();
+ run.erase();
}
}
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index 0dbb063..b17215f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -95,9 +95,14 @@
writer.open();
doPass(table, partitionRuns, numberOfTuples, writer, 1); // level 0 use used at build stage.
} catch (Exception e) {
- generatedRuns.forEach(run -> run.getFileReference().delete());
- writer.fail();
- throw new HyracksDataException(e);
+ try {
+ for (RunFileWriter run : generatedRuns) {
+ run.erase();
+ }
+ } finally {
+ writer.fail();
+ }
+ throw e;
} finally {
writer.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index e0ef2b3..d29e9ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -75,10 +75,17 @@
}
public void flushSpilledPartitions() throws HyracksDataException {
- for (int i = 0; i < runWriters.length; ++i) {
- if (runWriters[i] != null) {
- flushPartitionToRun(i, runWriters[i]);
- runWriters[i].close();
+ try {
+ for (int i = 0; i < runWriters.length; ++i) {
+ if (runWriters[i] != null) {
+ flushPartitionToRun(i, runWriters[i]);
+ }
+ }
+ } finally {
+ for (int i = 0; i < runWriters.length; ++i) {
+ if (runWriters[i] != null) {
+ runWriters[i].close();
+ }
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index ad14bad..b622c9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -470,8 +470,12 @@
@Override
public void close() throws HyracksDataException {
try {
- state.joiner.join(inBuffer.getBuffer(), writer);
- state.joiner.closeJoin(writer);
+ try {
+ state.joiner.join(inBuffer.getBuffer(), writer);
+ state.joiner.completeJoin(writer);
+ } finally {
+ state.joiner.releaseMemory();
+ }
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
.createPartitioner();
ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
@@ -508,25 +512,35 @@
if (buildWriter != null) {
RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
- buildReader.open();
- while (buildReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
- FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.reset();
+ try {
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+ FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.reset();
+ }
+ } finally {
+ buildReader.close();
}
- buildReader.close();
}
// probe
RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
- probeReader.open();
- while (probeReader.nextFrame(inBuffer)) {
- joiner.join(inBuffer.getBuffer(), writer);
- inBuffer.reset();
+ try {
+ probeReader.open();
+ try {
+ while (probeReader.nextFrame(inBuffer)) {
+ joiner.join(inBuffer.getBuffer(), writer);
+ inBuffer.reset();
+ }
+ joiner.completeJoin(writer);
+ } finally {
+ joiner.releaseMemory();
+ }
+ } finally {
+ probeReader.close();
}
- probeReader.close();
- joiner.closeJoin(writer);
}
}
} finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 8e52838..ec1c3a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -196,8 +196,11 @@
accessorProbe.reset(newAccessorProbe.getBuffer());
}
- public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ public void completeJoin(IFrameWriter writer) throws HyracksDataException {
appender.write(writer, true);
+ }
+
+ public void releaseMemory() throws HyracksDataException {
int nFrames = buffers.size();
// Frames assigned to the data table will be released here.
if (bufferManager != null) {
@@ -206,7 +209,6 @@
}
}
buffers.clear();
-
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+ Thread.currentThread().getId() + ".");
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index a8d3f7e..cbeadd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -275,9 +275,13 @@
@Override
public void close() throws HyracksDataException {
try {
- state.joiner.closeJoin(writer);
+ state.joiner.completeJoin(writer);
} finally {
- writer.close();
+ try {
+ state.joiner.releaseMemory();
+ } finally {
+ writer.close();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 202aac6..16c21df 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -50,7 +50,6 @@
private final IFrame outBuffer;
private final IFrame innerBuffer;
private final VariableFrameMemoryManager outerBufferMngr;
- private RunFileReader runFileReader;
private final RunFileWriter runFileWriter;
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuilder;
@@ -103,14 +102,17 @@
public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
- runFileReader = runFileWriter.createReader();
- runFileReader.open();
- while (runFileReader.nextFrame(innerBuffer)) {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+ RunFileReader runFileReader = runFileWriter.createReader();
+ try {
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+ blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+ }
}
+ } finally {
+ runFileReader.close();
}
- runFileReader.close();
outerBufferMngr.reset();
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
@@ -174,20 +176,25 @@
}
}
- public void closeJoin(IFrameWriter writer) throws HyracksDataException {
- runFileReader = runFileWriter.createDeleteOnCloseReader();
- runFileReader.open();
- while (runFileReader.nextFrame(innerBuffer)) {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+ public void completeJoin(IFrameWriter writer) throws HyracksDataException {
+ RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
+ try {
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+ blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+ }
}
+ } finally {
+ runFileReader.close();
}
- runFileReader.close();
- outerBufferMngr.reset();
-
appender.write(writer, true);
}
+ public void releaseMemory() throws HyracksDataException {
+ outerBufferMngr.reset();
+ }
+
private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
throws HyracksDataException {
int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09b7544..5d79f75 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -173,6 +173,7 @@
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private JoinCacheTaskState state;
+ boolean failed = false;
@Override
public void open() throws HyracksDataException {
@@ -188,8 +189,24 @@
@Override
public void close() throws HyracksDataException {
+ if (failed) {
+ try {
+ state.joiner.closeCache();
+ } finally {
+ writer.close();
+ }
+ return;
+ }
try {
- state.joiner.closeJoin(writer);
+ try {
+ state.joiner.completeJoin(writer);
+ } finally {
+ state.joiner.releaseMemory();
+ }
+ } catch (Exception e) {
+ state.joiner.closeCache();
+ writer.fail();
+ throw e;
} finally {
writer.close();
}
@@ -197,6 +214,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
};
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 17f009e..a5e2f6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -240,10 +240,10 @@
/**
* In case of failure happens, we need to clear up the generated temporary files.
*/
- public void clearBuildTempFiles() {
+ public void clearBuildTempFiles() throws HyracksDataException {
for (int i = 0; i < buildRFWriters.length; i++) {
if (buildRFWriters[i] != null) {
- buildRFWriters[i].getFileReference().delete();
+ buildRFWriters[i].erase();
}
}
}
@@ -258,17 +258,22 @@
runFileWriters = probeRFWriters;
break;
}
-
- for (int pid = spilledStatus.nextSetBit(0); pid >= 0
- && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
- if (bufferManager.getNumTuples(pid) > 0) {
- bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
- bufferManager.clearPartition(pid);
+ try {
+ for (int pid = spilledStatus.nextSetBit(0); pid >= 0
+ && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+ if (bufferManager.getNumTuples(pid) > 0) {
+ bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+ bufferManager.clearPartition(pid);
+ }
}
- // It doesn't matter whether a spilled partition currently holds a tuple in memory or not.
- // The file that holds the corresponding spilled partition needs to be closed.
- if (runFileWriters[pid] != null) {
- runFileWriters[pid].close();
+ } finally {
+ // Force to close all run file writers.
+ if (runFileWriters != null) {
+ for (RunFileWriter runFileWriter : runFileWriters) {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
}
}
}
@@ -418,26 +423,28 @@
private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
RunFileReader r = wr.createReader();
- r.open();
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
- }
- while (r.nextFrame(reloadBuffer)) {
- accessorBuild.reset(reloadBuffer.getBuffer());
- for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
- if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ try {
+ r.open();
+ if (reloadBuffer == null) {
+ reloadBuffer = new VSizeFrame(ctx);
+ }
+ while (r.nextFrame(reloadBuffer)) {
+ accessorBuild.reset(reloadBuffer.getBuffer());
+ for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+ if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ continue;
+ }
// for some reason (e.g. due to fragmentation) if the inserting failed,
// we need to clear the occupied frames
bufferManager.clearPartition(pid);
- r.close();
return false;
}
}
+ // Closes and deletes the run file if it is already loaded into memory.
+ r.setDeleteAfterClose(true);
+ } finally {
+ r.close();
}
-
- // Closes and deletes the run file if it is already loaded into memory.
- r.setDeleteAfterClose(true);
- r.close();
spilledStatus.set(pid, false);
buildRFWriters[pid] = null;
return true;
@@ -538,10 +545,13 @@
return spilledStatus.nextSetBit(0) < 0;
}
- public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+ public void completeProbe(IFrameWriter writer) throws HyracksDataException {
//We do NOT join the spilled partitions here, that decision is made at the descriptor level
//(which join technique to use)
- inMemJoiner.closeJoin(writer);
+ inMemJoiner.completeJoin(writer);
+ }
+
+ public void releaseResource() throws HyracksDataException {
inMemJoiner.closeTable();
closeAllSpilledPartitions(SIDE.PROBE);
bufferManager.close();
@@ -553,10 +563,10 @@
/**
* In case of failure happens, we need to clear up the generated temporary files.
*/
- public void clearProbeTempFiles() {
+ public void clearProbeTempFiles() throws HyracksDataException {
for (int i = 0; i < probeRFWriters.length; i++) {
if (probeRFWriters[i] != null) {
- probeRFWriters[i].getFileReference().delete();
+ probeRFWriters[i].erase();
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index a72c0c6..d5e3568 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -385,6 +385,7 @@
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
+ private boolean failed = false;
@Override
public void open() throws HyracksDataException {
@@ -406,21 +407,33 @@
@Override
public void fail() throws HyracksDataException {
- state.hybridHJ.clearProbeTempFiles();
+ failed = true;
writer.fail();
}
@Override
public void close() throws HyracksDataException {
+ if (failed) {
+ try {
+ // Clear temp files if fail() was called.
+ state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.clearProbeTempFiles();
+ } finally {
+ writer.close(); // writer should always be closed.
+ }
+ logProbeComplete();
+ return;
+ }
try {
- state.hybridHJ.closeProbe(writer);
-
+ try {
+ state.hybridHJ.completeProbe(writer);
+ } finally {
+ state.hybridHJ.releaseResource();
+ }
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
.nextSetBit(pid + 1)) {
-
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
@@ -434,10 +447,25 @@
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
joinPartitionPair(bReader, pReader, bSize, pSize, 1);
}
-
+ } catch (Exception e) {
+ // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
+ // to send the failure signal to the downstream, when there is a throwable thrown.
+ writer.fail();
+ // Clear temp files as this.fail() nor this.close() will no longer be called after close().
+ state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.clearProbeTempFiles();
+ // Re-throw the whatever is caught.
+ throw e;
} finally {
- writer.close();
+ try {
+ logProbeComplete();
+ } finally {
+ writer.close();
+ }
}
+ }
+
+ private void logProbeComplete() {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
}
@@ -542,9 +570,7 @@
boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys,
@@ -552,79 +578,107 @@
nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
- buildSideReader.open();
- rHHj.initBuild();
- rPartbuff.reset();
- while (buildSideReader.nextFrame(rPartbuff)) {
- rHHj.build(rPartbuff.getBuffer());
+ try {
+ buildSideReader.open();
+ try {
+ rHHj.initBuild();
+ rPartbuff.reset();
+ while (buildSideReader.nextFrame(rPartbuff)) {
+ rHHj.build(rPartbuff.getBuffer());
+ }
+ } finally {
+ // Makes sure that files are always properly closed.
+ rHHj.closeBuild();
+ }
+ } finally {
+ buildSideReader.close();
}
- rHHj.closeBuild();
- buildSideReader.close();
- probeSideReader.open();
- rHHj.initProbe();
- rPartbuff.reset();
- while (probeSideReader.nextFrame(rPartbuff)) {
- rHHj.probe(rPartbuff.getBuffer(), writer);
+ try {
+ probeSideReader.open();
+ rPartbuff.reset();
+ try {
+ rHHj.initProbe();
+ while (probeSideReader.nextFrame(rPartbuff)) {
+ rHHj.probe(rPartbuff.getBuffer(), writer);
+ }
+ rHHj.completeProbe(writer);
+ } finally {
+ rHHj.releaseResource();
+ }
+ } finally {
+ // Makes sure that files are always properly closed.
+ probeSideReader.close();
}
- rHHj.closeProbe(writer);
- probeSideReader.close();
- int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
- int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
- int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
+ try {
+ int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+ int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+ int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
- BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
- + "(isLeftOuter || build<probe) - [Level " + level + "]");
- }
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
- int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
- int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+ BitSet rPStatus = rHHj.getPartitionStatus();
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
+ //Case 2.1.1 - Keep applying HHJ
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
+ + "(isLeftOuter || build<probe) - [Level " + level + "]");
+ }
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+ int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
+ int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
- if (rbrfw == null || rprfw == null) {
- if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
- appendNullToProbeTuples(rprfw);
+ if (rbrfw == null || rprfw == null) {
+ if (isLeftOuter && rprfw != null) {
+ // For the outer join, we don't reverse the role.
+ appendNullToProbeTuples(rprfw);
+ }
+ continue;
}
- continue;
- }
- if (isReversed) {
- joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
- } else {
- joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
- }
- }
-
- } else { //Case 2.1.2 - Switch to NLJ
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine(
- "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)"
- + " - [Level " + level + "]");
- }
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
- if (rbrfw == null || rprfw == null) {
- if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
- appendNullToProbeTuples(rprfw);
+ if (isReversed) {
+ joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
+ } else {
+ joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
}
- continue;
}
- int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
- int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
- // NLJ order is outer + inner, the order is reversed from the other joins
- if (isLeftOuter || probeSideInTups < buildSideInTups) {
- applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified
- } else {
- applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified
+ } else { //Case 2.1.2 - Switch to NLJ
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(
+ "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
+ + "(isLeftOuter || build<probe) - [Level " + level + "]");
+ }
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+ if (rbrfw == null || rprfw == null) {
+ if (isLeftOuter && rprfw != null) {
+ // For the outer join, we don't reverse the role.
+ appendNullToProbeTuples(rprfw);
+ }
+ continue;
+ }
+
+ int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+ int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+ // NLJ order is outer + inner, the order is reversed from the other joins
+ if (isLeftOuter || probeSideInTups < buildSideInTups) {
+ //checked-modified
+ applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw);
+ } else {
+ //checked-modified
+ applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw);
+ }
}
}
+ } catch (Exception e) {
+ // Make sure that temporary run files generated in recursive hybrid hash joins
+ // are closed and deleted.
+ rHHj.clearBuildTempFiles();
+ rHHj.clearProbeTempFiles();
+ throw e;
}
}
@@ -635,17 +689,20 @@
if (probeTupleAccessor == null) {
probeTupleAccessor = new FrameTupleAccessor(probeRd);
}
- probReader.open();
- while (probReader.nextFrame(rPartbuff)) {
- probeTupleAccessor.reset(rPartbuff.getBuffer());
- for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
- FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
+ try {
+ probReader.open();
+ while (probReader.nextFrame(rPartbuff)) {
+ probeTupleAccessor.reset(rPartbuff.getBuffer());
+ for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
+ FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
nullTupleBuild.getSize());
+ }
}
+ nullResultAppender.write(writer, true);
+ } finally {
+ probReader.close();
}
- probReader.close();
- nullResultAppender.write(writer, true);
}
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
@@ -654,9 +711,7 @@
throws HyracksDataException {
boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
state.memForJoin * ctx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
@@ -667,39 +722,52 @@
new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
predEvaluator, isReversed, bufferManager);
- bReader.open();
- rPartbuff.reset();
- while (bReader.nextFrame(rPartbuff)) {
- // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
- // in the InMemoryHashJoin.
- ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
- // If a frame cannot be allocated, there may be a chance if we can compact the table,
- // one or more frame may be reclaimed.
- if (copyBuffer == null) {
- if (joiner.compactHashTable() > 0) {
- copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
- }
+ try {
+ bReader.open();
+ rPartbuff.reset();
+ while (bReader.nextFrame(rPartbuff)) {
+ // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
+ // in the InMemoryHashJoin.
+ ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+ // If a frame cannot be allocated, there may be a chance if we can compact the table,
+ // one or more frame may be reclaimed.
if (copyBuffer == null) {
- // Still no frame is allocated? At this point, we have no way to get a frame.
- throw new HyracksDataException(
- "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+ if (joiner.compactHashTable() > 0) {
+ copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+ }
+ if (copyBuffer == null) {
+ // Still no frame is allocated? At this point, we have no way to get a frame.
+ throw new HyracksDataException(
+ "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+ }
}
+ FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
+ joiner.build(copyBuffer);
+ rPartbuff.reset();
}
- FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
- joiner.build(copyBuffer);
- rPartbuff.reset();
+ } finally {
+ bReader.close();
}
- bReader.close();
- rPartbuff.reset();
- //probe
- pReader.open();
- while (pReader.nextFrame(rPartbuff)) {
- joiner.join(rPartbuff.getBuffer(), writer);
+ try {
+ //probe
+ pReader.open();
rPartbuff.reset();
+ try {
+ while (pReader.nextFrame(rPartbuff)) {
+ joiner.join(rPartbuff.getBuffer(), writer);
+ rPartbuff.reset();
+ }
+ joiner.completeJoin(writer);
+ } finally {
+ joiner.releaseMemory();
+ }
+ } finally {
+ try {
+ pReader.close();
+ } finally {
+ joiner.closeTable();
+ }
}
- pReader.close();
- joiner.closeJoin(writer);
- joiner.closeTable();
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
@@ -716,40 +784,38 @@
nlj.setIsReversed(isReversed);
IFrame cacheBuff = new VSizeFrame(ctx);
- innerReader.open();
- while (innerReader.nextFrame(cacheBuff)) {
- nlj.cache(cacheBuff.getBuffer());
- cacheBuff.reset();
+ try {
+ innerReader.open();
+ while (innerReader.nextFrame(cacheBuff)) {
+ nlj.cache(cacheBuff.getBuffer());
+ cacheBuff.reset();
+ }
+ } finally {
+ try {
+ nlj.closeCache();
+ } finally {
+ innerReader.close();
+ }
}
- nlj.closeCache();
-
- IFrame joinBuff = new VSizeFrame(ctx);
- outerReader.open();
-
- while (outerReader.nextFrame(joinBuff)) {
- nlj.join(joinBuff.getBuffer(), writer);
- joinBuff.reset();
+ try {
+ IFrame joinBuff = new VSizeFrame(ctx);
+ outerReader.open();
+ try {
+ while (outerReader.nextFrame(joinBuff)) {
+ nlj.join(joinBuff.getBuffer(), writer);
+ joinBuff.reset();
+ }
+ nlj.completeJoin(writer);
+ } finally {
+ nlj.releaseMemory();
+ }
+ } finally {
+ outerReader.close();
}
-
- nlj.closeJoin(writer);
- outerReader.close();
- innerReader.close();
}
};
return op;
}
}
- public void setSkipInMemHJ(boolean b) {
- skipInMemoryHJ = b;
- }
-
- public void setForceNLJ(boolean b) {
- forceNLJ = b;
- }
-
- public void setForceRR(boolean b) {
- forceRoleReversal = !isLeftOuter && b;
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 4253114..e7da174 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -82,14 +82,12 @@
while (in.nextFrame(frame)) {
writer.nextFrame(frame.getBuffer());
}
- } catch (Throwable th) {
- throw new HyracksDataException(th);
} finally {
in.close();
}
- } catch (Throwable th) {
+ } catch (Exception e) {
writer.fail();
- throw new HyracksDataException(th);
+ throw e;
} finally {
writer.close();
if (numConsumers.decrementAndGet() == 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 90b4b6c..b422ef4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -80,7 +80,8 @@
final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
return new AbstractUnaryInputSinkOperatorNodePushable() {
- IFrameWriter datasetPartitionWriter;
+ private IFrameWriter datasetPartitionWriter;
+ private boolean failed = false;
@Override
public void open() throws HyracksDataException {
@@ -110,15 +111,22 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
datasetPartitionWriter.fail();
}
@Override
public void close() throws HyracksDataException {
- if (frameOutputStream.getTupleCount() > 0) {
- frameOutputStream.flush(datasetPartitionWriter);
+ try {
+ if (!failed && frameOutputStream.getTupleCount() > 0) {
+ frameOutputStream.flush(datasetPartitionWriter);
+ }
+ } catch (Exception e) {
+ datasetPartitionWriter.fail();
+ throw e;
+ } finally {
+ datasetPartitionWriter.close();
}
- datasetPartitionWriter.close();
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 6d9d085..f4158ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -84,10 +84,13 @@
finalWriter = prepareSkipMergingFinalResultWriter(writer);
finalWriter.open();
if (sorter != null) {
- if (sorter.hasRemaining()) {
- sorter.flush(finalWriter);
+ try {
+ if (sorter.hasRemaining()) {
+ sorter.flush(finalWriter);
+ }
+ } finally {
+ sorter.close();
}
- sorter.close();
}
} else {
/** recycle sort buffer */
@@ -128,10 +131,15 @@
RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
- mergeResultWriter.open();
- merge(mergeResultWriter, partialRuns);
- mergeResultWriter.close();
-
+ try {
+ mergeResultWriter.open();
+ merge(mergeResultWriter, partialRuns);
+ } catch (Throwable t) {
+ mergeResultWriter.fail();
+ throw t;
+ } finally {
+ mergeResultWriter.close();
+ }
reader = mergeFileWriter.createReader();
}
runs.add(reader);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index cdabcda..ef9e4b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -103,7 +103,7 @@
writer.nextFrame(frame);
} catch (Exception e) {
writer.fail();
- throw new HyracksDataException(e);
+ throw e;
} finally {
writer.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index bba18b3..683857f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,9 @@
public void initialize() throws HyracksDataException {
try {
writer.open();
- } catch (Throwable th) {
+ } catch (Exception e) {
writer.fail();
- throw new HyracksDataException(th);
+ throw e;
} finally {
writer.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
index f83ab6a..521dff1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -227,12 +227,14 @@
ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
- getBuilder().open();
- for (IFrame frame : input) {
- getBuilder().nextFrame(frame.getBuffer());
+ try {
+ getBuilder().open();
+ for (IFrame frame : input) {
+ getBuilder().nextFrame(frame.getBuffer());
+ }
+ } finally {
+ getBuilder().close();
}
- getBuilder().close();
-
getMerger().setOutputFrameWriter(0, writer, outputRec);
getMerger().initialize();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 673c6fa..cfd4f30 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -130,13 +130,16 @@
assertTrue(runs.size() > 0);
for (GeneratedRunFileReader run : runs) {
- run.open();
- int preKey = Integer.MIN_VALUE;
- while (run.nextFrame(frame)) {
- fta.reset(frame.getBuffer());
- preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+ try {
+ run.open();
+ int preKey = Integer.MIN_VALUE;
+ while (run.nextFrame(frame)) {
+ fta.reset(frame.getBuffer());
+ preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+ }
+ } finally {
+ run.close();
}
- run.close();
}
assertTrue(keyValuePair.isEmpty());
}