Cleanup Task printStackTrace/interrupted handling/misc

Change-Id: I1ea6d4d6d8108768503e4ab11fe504423d76c291
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1824
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 3c70dba..b52a6a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -91,6 +91,7 @@
     public static final int RESOURCE_DOES_NOT_EXIST = 55;
     public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
     public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
+    public static final int TASK_ABORTED = 58;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 604b534..35a2fc5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -18,8 +18,6 @@
 #
 
 # 0 --- 9999: runtime errors
-# 10000 ---- 19999: compilation errors
-
 1 = Unsupported operation %1$s in %2$s operator
 2 = Error in processing tuple %1$s in a frame
 4 = The file with absolute path %1$s is not within any of the current IO devices
@@ -53,8 +51,8 @@
 32 = No record for partition %1$s of result set %2$s
 33 = Inserting duplicate keys into the primary storage
 34 = Cannot load an index that is not empty
-35 = Modify not supported in External LSM Inedx
-36 = Flush not supported in External LSM Inedx
+35 = Modify not supported in External LSM Index
+36 = Flush not supported in External LSM Index
 37 = Index key not found
 38 = Index is not updatable
 39 = Merge Threshold is less than or equal to 0
@@ -76,4 +74,7 @@
 55 = Resource does not exist for %1$s
 56 = LSM disk component scan is not allowed for a secondary index
 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
+58 = Task %1$s was aborted
+
+# 10000 ---- 19999: compilation errors
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..04d48f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.nc;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.TASK_ABORTED;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,6 +31,8 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -64,6 +68,8 @@
 import org.apache.hyracks.control.nc.work.NotifyTaskFailureWork;
 
 public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+    private static final Logger LOGGER = Logger.getLogger(Task.class.getName());
+
     private final Joblet joblet;
 
     private final TaskAttemptId taskAttemptId;
@@ -262,7 +268,7 @@
         // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
         // the thread is not escaped from interruption.
         if (!addPendingThread(ct)) {
-            exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!"));
+            exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId()));
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
             return;
@@ -278,29 +284,26 @@
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
                         sem.acquire();
                         final int cIdx = i;
-                        executorService.execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                Thread thread = Thread.currentThread();
-                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
-                                // the thread is not escaped from interruption.
-                                if (!addPendingThread(thread)) {
-                                    return;
+                        executorService.execute(() -> {
+                            Thread thread = Thread.currentThread();
+                            // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+                            // the thread is not escaped from interruption.
+                            if (!addPendingThread(thread)) {
+                                return;
+                            }
+                            String oldName = thread.getName();
+                            thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                            thread.setPriority(Thread.MIN_PRIORITY);
+                            try {
+                                pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+                            } catch (HyracksDataException e) {
+                                synchronized (Task.this) {
+                                    exceptions.add(e);
                                 }
-                                String oldName = thread.getName();
-                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
-                                thread.setPriority(Thread.MIN_PRIORITY);
-                                try {
-                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
-                                } catch (HyracksDataException e) {
-                                    synchronized (Task.this) {
-                                        exceptions.add(e);
-                                    }
-                                } finally {
-                                    thread.setName(oldName);
-                                    sem.release();
-                                    removePendingThread(thread);
-                                }
+                            } finally {
+                                thread.setName(oldName);
+                                sem.release();
+                                removePendingThread(thread);
                             }
                         });
                     }
@@ -315,6 +318,9 @@
             }
             NodeControllerService ncs = joblet.getNodeController();
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+        } catch (InterruptedException e) {
+            exceptions.add(e);
+            Thread.currentThread().interrupt();
         } catch (Exception e) {
             exceptions.add(e);
         } finally {
@@ -323,8 +329,13 @@
             removePendingThread(ct);
         }
         if (!exceptions.isEmpty()) {
-            for (Exception e : exceptions) {
-                e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                for (int i = 0; i < exceptions.size(); i++) {
+                    LOGGER.log(Level.WARNING,
+                            "Task " + taskAttemptId + " failed with exception"
+                                    + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size()  + ")" : ""),
+                            exceptions.get(i));
+                }
             }
             NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
@@ -340,7 +351,7 @@
         try {
             collector.open();
             try {
-                if (inputChannels.size() <= 0) {
+                if (inputChannels.isEmpty()) {
                     joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
                             PartitionState.STARTED);
                 } else {
@@ -349,8 +360,8 @@
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
-                    writer.open();
                     try {
+                        writer.open();
                         VSizeFrame frame = new VSizeFrame(this);
                         while (reader.nextFrame(frame)) {
                             if (aborted) {
@@ -361,7 +372,11 @@
                             buffer.compact();
                         }
                     } catch (Exception e) {
-                        writer.fail();
+                        try {
+                            writer.fail();
+                        } catch (HyracksDataException e1) {
+                            e.addSuppressed(e1);
+                        }
                         throw e;
                     } finally {
                         writer.close();