Cleanup Task printStackTrace/interrupted handling/misc
Change-Id: I1ea6d4d6d8108768503e4ab11fe504423d76c291
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1824
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 3c70dba..b52a6a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -91,6 +91,7 @@
public static final int RESOURCE_DOES_NOT_EXIST = 55;
public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
+ public static final int TASK_ABORTED = 58;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 604b534..35a2fc5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -18,8 +18,6 @@
#
# 0 --- 9999: runtime errors
-# 10000 ---- 19999: compilation errors
-
1 = Unsupported operation %1$s in %2$s operator
2 = Error in processing tuple %1$s in a frame
4 = The file with absolute path %1$s is not within any of the current IO devices
@@ -53,8 +51,8 @@
32 = No record for partition %1$s of result set %2$s
33 = Inserting duplicate keys into the primary storage
34 = Cannot load an index that is not empty
-35 = Modify not supported in External LSM Inedx
-36 = Flush not supported in External LSM Inedx
+35 = Modify not supported in External LSM Index
+36 = Flush not supported in External LSM Index
37 = Index key not found
38 = Index is not updatable
39 = Merge Threshold is less than or equal to 0
@@ -76,4 +74,7 @@
55 = Resource does not exist for %1$s
56 = LSM disk component scan is not allowed for a secondary index
57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
+58 = Task %1$s was aborted
+
+# 10000 ---- 19999: compilation errors
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..04d48f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.nc;
+import static org.apache.hyracks.api.exceptions.ErrorCode.TASK_ABORTED;
+
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -29,6 +31,8 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -64,6 +68,8 @@
import org.apache.hyracks.control.nc.work.NotifyTaskFailureWork;
public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+ private static final Logger LOGGER = Logger.getLogger(Task.class.getName());
+
private final Joblet joblet;
private final TaskAttemptId taskAttemptId;
@@ -262,7 +268,7 @@
// Calls synchronized addPendingThread(..) to make sure that in the abort() method,
// the thread is not escaped from interruption.
if (!addPendingThread(ct)) {
- exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!"));
+ exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId()));
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
return;
@@ -278,29 +284,26 @@
final IFrameWriter writer = operator.getInputFrameWriter(i);
sem.acquire();
final int cIdx = i;
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- Thread thread = Thread.currentThread();
- // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
- // the thread is not escaped from interruption.
- if (!addPendingThread(thread)) {
- return;
+ executorService.execute(() -> {
+ Thread thread = Thread.currentThread();
+ // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(thread)) {
+ return;
+ }
+ String oldName = thread.getName();
+ thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ try {
+ pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+ } catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ exceptions.add(e);
}
- String oldName = thread.getName();
- thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
- thread.setPriority(Thread.MIN_PRIORITY);
- try {
- pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
- } catch (HyracksDataException e) {
- synchronized (Task.this) {
- exceptions.add(e);
- }
- } finally {
- thread.setName(oldName);
- sem.release();
- removePendingThread(thread);
- }
+ } finally {
+ thread.setName(oldName);
+ sem.release();
+ removePendingThread(thread);
}
});
}
@@ -315,6 +318,9 @@
}
NodeControllerService ncs = joblet.getNodeController();
ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+ } catch (InterruptedException e) {
+ exceptions.add(e);
+ Thread.currentThread().interrupt();
} catch (Exception e) {
exceptions.add(e);
} finally {
@@ -323,8 +329,13 @@
removePendingThread(ct);
}
if (!exceptions.isEmpty()) {
- for (Exception e : exceptions) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ for (int i = 0; i < exceptions.size(); i++) {
+ LOGGER.log(Level.WARNING,
+ "Task " + taskAttemptId + " failed with exception"
+ + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""),
+ exceptions.get(i));
+ }
}
NodeControllerService ncs = joblet.getNodeController();
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
@@ -340,7 +351,7 @@
try {
collector.open();
try {
- if (inputChannels.size() <= 0) {
+ if (inputChannels.isEmpty()) {
joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
PartitionState.STARTED);
} else {
@@ -349,8 +360,8 @@
IFrameReader reader = collector.getReader();
reader.open();
try {
- writer.open();
try {
+ writer.open();
VSizeFrame frame = new VSizeFrame(this);
while (reader.nextFrame(frame)) {
if (aborted) {
@@ -361,7 +372,11 @@
buffer.compact();
}
} catch (Exception e) {
- writer.fail();
+ try {
+ writer.fail();
+ } catch (HyracksDataException e1) {
+ e.addSuppressed(e1);
+ }
throw e;
} finally {
writer.close();