Fix Error in Aborting Task in Super Activity
When aborting a task, its thread gets interrupted. This creates
a problem when interrupting
Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/582
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4e842bb..827e478 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -196,24 +196,26 @@
private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
- // Run one action for all OperatorNodePushables in parallel through a thread pool.
- for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
- initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- opAction.runAction(op);
- return null;
- }
- }));
- }
-
- // Waits until all parallel actions to finish.
- for (Future<Void> initializationTask : initializationTasks) {
- try {
- initializationTask.get();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ try {
+ // Run one action for all OperatorNodePushables in parallel through a thread pool.
+ for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+ initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ opAction.runAction(op);
+ return null;
+ }
+ }));
}
+ // Waits until all parallel actions to finish.
+ for (Future<Void> initializationTask : initializationTasks) {
+ initializationTask.get();
+ }
+ } catch (Throwable th) {
+ for (Future<Void> initializationTask : initializationTasks) {
+ initializationTask.cancel(true);
+ }
+ throw new HyracksDataException(th);
}
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 12df264..61baf82 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -255,8 +255,8 @@
addPendingThread(ct);
try {
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
- operator.initialize();
try {
+ operator.initialize();
if (collectors.length > 0) {
final Semaphore sem = new Semaphore(collectors.length - 1);
for (int i = 1; i < collectors.length; ++i) {