Fix Error in Aborting Task in Super Activity

When aborting a task, its thread gets interrupted. This creates
a problem when interrupting

Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/582
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4e842bb..827e478 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -196,24 +196,26 @@
 
     private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
         List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
-        // Run one action for all OperatorNodePushables in parallel through a thread pool.
-        for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
-            initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    opAction.runAction(op);
-                    return null;
-                }
-            }));
-        }
-
-        // Waits until all parallel actions to finish.
-        for (Future<Void> initializationTask : initializationTasks) {
-            try {
-                initializationTask.get();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
+        try {
+            // Run one action for all OperatorNodePushables in parallel through a thread pool.
+            for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+                initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        opAction.runAction(op);
+                        return null;
+                    }
+                }));
             }
+            // Waits until all parallel actions to finish.
+            for (Future<Void> initializationTask : initializationTasks) {
+                initializationTask.get();
+            }
+        } catch (Throwable th) {
+            for (Future<Void> initializationTask : initializationTasks) {
+                initializationTask.cancel(true);
+            }
+            throw new HyracksDataException(th);
         }
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 12df264..61baf82 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -255,8 +255,8 @@
         addPendingThread(ct);
         try {
             ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
-            operator.initialize();
             try {
+                operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {