[NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently, if a failure happens in SuperActivityOperatorNodePushable,
  we only report that failure and miss the rest of the failures.
  This is especially critical in case of job cancellation since we
  don't know where each thread was interrupted.
- After this change, we suppress all other failures in the root
  failure for reporting purposes.

Change-Id: Ibbf31dd91303ce2f606734fcccb19270875266b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2500
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -193,15 +195,20 @@
     }
 
     private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
-        List<Future<Void>> tasks = new ArrayList<>();
+        List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+        Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
         final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+        Throwable root = null;
         try {
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 tasks.add(ctx.getExecutorService().submit(() -> {
                     startSemaphore.release();
                     try {
                         action.run(op);
+                    } catch (Throwable th) { // NOSONAR: Must catch all causes of failure
+                        failures.offer(th);
+                        throw th;
                     } finally {
                         completeSemaphore.release();
                     }
@@ -211,13 +218,16 @@
             for (Future<Void> task : tasks) {
                 task.get();
             }
-        } catch (InterruptedException e) {
-            cancelTasks(tasks, startSemaphore, completeSemaphore);
-            Thread.currentThread().interrupt();
-            throw HyracksDataException.create(e);
         } catch (ExecutionException e) {
+            root = e.getCause();
+        } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+            root = e;
+        }
+        if (root != null) {
+            final Throwable failure = root;
             cancelTasks(tasks, startSemaphore, completeSemaphore);
-            throw HyracksDataException.create(e.getCause());
+            failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+            throw HyracksDataException.create(failure);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@
 
     private volatile boolean aborted;
 
-    private NodeControllerService ncs;
+    private final NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
@@ -286,67 +286,62 @@
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
-            Exception operatorException = null;
+            Throwable operatorException = null;
             try {
                 operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {
+                        // Q. Do we ever have a task that has more than one collector?
                         final IPartitionCollector collector = collectors[i];
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
-                        sem.acquire();
+                        sem.acquireUninterruptibly();
                         final int cIdx = i;
                         executorService.execute(() -> {
-                            Thread thread = Thread.currentThread();
-                            // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
-                            // the thread is not escaped from interruption.
-                            if (!addPendingThread(thread)) {
-                                return;
-                            }
-                            thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
-                            thread.setPriority(Thread.MIN_PRIORITY);
                             try {
-                                pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
-                            } catch (HyracksDataException e) {
-                                synchronized (Task.this) {
-                                    exceptions.add(e);
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
+                                    return;
+                                }
+                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
+                                try {
+                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+                                } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        exceptions.add(e);
+                                    }
+                                } finally {
+                                    removePendingThread(thread);
                                 }
                             } finally {
                                 sem.release();
-                                removePendingThread(thread);
                             }
                         });
                     }
                     try {
                         pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
                     } finally {
-                        sem.acquire(collectors.length - 1);
+                        sem.acquireUninterruptibly(collectors.length - 1);
                     }
                 }
-            } catch (Exception e) {
-                // Store the operator exception
+            } catch (Throwable e) { // NOSONAR: Must catch all failures
                 operatorException = e;
-                throw e;
             } finally {
                 try {
                     operator.deinitialize();
-                } catch (Exception e) {
-                    if (operatorException != null) {
-                        // Add deinitialize exception to the operator exception to keep track of both
-                        operatorException.addSuppressed(e);
-                    } else {
-                        operatorException = e;
-                    }
-                    throw operatorException;
+                } catch (Throwable e) { // NOSONAR: Must catch all failures
+                    operatorException = ExceptionUtils.suppress(operatorException, e);
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
+            if (operatorException != null) {
+                throw operatorException;
+            }
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
-        } catch (InterruptedException e) {
-            exceptions.add(e);
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            exceptions.add(e);
+        } catch (Throwable e) { // NOSONAR: Catch all failures
+            exceptions.add(HyracksDataException.create(e));
         } finally {
             close();
             removePendingThread(ct);
@@ -360,7 +355,6 @@
                             exceptions.get(i));
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue()
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
@@ -457,6 +451,7 @@
         return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length);
     }
 
+    @Override
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }