[ASTERIXDB-2002][HYR] Report failures during task start
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- failures that happen before creating the task object were
never reported because the task object was null and they
simply throw null pointer exception.
Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1900
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index def4c83..83ab532 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -61,7 +61,8 @@
private int inputArity = 0;
public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
- IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
this.parent = parent;
this.startActivities = startActivities;
this.ctx = ctx;
@@ -76,7 +77,7 @@
try {
init();
} catch (Exception e) {
- throw new IllegalStateException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 74a628d..bff2794 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -275,7 +275,8 @@
if (!addPendingThread(ct)) {
exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId()));
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
- ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+ ncs.getWorkQueue()
+ .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
return;
}
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
@@ -353,13 +354,14 @@
for (int i = 0; i < exceptions.size(); i++) {
LOGGER.log(Level.WARNING,
"Task " + taskAttemptId + " failed with exception"
- + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""),
+ + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""),
exceptions.get(i));
}
}
NodeControllerService ncs = joblet.getNodeController();
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
- ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+ ncs.getWorkQueue()
+ .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index e81fa5a..fa8ba28 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -19,7 +19,10 @@
package org.apache.hyracks.control.nc.work;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -27,30 +30,36 @@
import org.apache.hyracks.control.nc.Task;
public class NotifyTaskFailureWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(NotifyTaskFailureWork.class.getName());
private final NodeControllerService ncs;
private final Task task;
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
private final List<Exception> exceptions;
- public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) {
+ public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId,
+ TaskAttemptId taskId) {
this.ncs = ncs;
this.task = task;
this.exceptions = exceptions;
+ this.jobId = jobId;
+ this.taskId = taskId;
}
@Override
public void run() {
try {
- JobId jobId = task.getJobletContext().getJobId();
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
if (dpm != null) {
dpm.abortReader(jobId);
}
- ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), exceptions);
- //exceptions.get(0).printStackTrace();
+ ncs.getClusterController().notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Failure reporting task failure to cluster controller", e);
}
- task.getJoblet().removeTask(task);
+ if (task != null) {
+ task.getJoblet().removeTask(task);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index b55cd4b..c369781 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -102,6 +102,7 @@
@Override
public void run() {
Task task = null;
+ int taskIndex = 0;
try {
ncs.updateMaxJobId(jobId);
NCServiceContext serviceCtx = ncs.getContext();
@@ -122,7 +123,8 @@
return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
};
- for (TaskAttemptDescriptor td : taskDescriptors) {
+ while (taskIndex < taskDescriptors.size()) {
+ TaskAttemptDescriptor td = taskDescriptors.get(taskIndex);
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
ActivityId aid = tid.getActivityId();
@@ -133,6 +135,7 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+ task = null;
task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
@@ -174,13 +177,16 @@
task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
joblet.addTask(task);
task.start();
+ taskIndex++;
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failure starting a task", e);
// notify cc of start task failure
List<Exception> exceptions = new ArrayList<>();
+ exceptions.add(e);
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
- ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions));
+ TaskAttemptId taskId = taskDescriptors.get(taskIndex).getTaskAttemptId();
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions, jobId, taskId));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 05a7e2d..18479e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -59,8 +59,8 @@
private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
- public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003", "asterix-004",
- "asterix-005", "asterix-006", "asterix-007" };
+ public static final String[] ASTERIX_IDS =
+ { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" };
private static ClusterControllerService cc;
@@ -103,7 +103,7 @@
ncConfig.setClusterListenAddress("127.0.0.1");
ncConfig.setDataListenAddress("127.0.0.1");
ncConfig.setResultListenAddress("127.0.0.1");
- ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() });
+ ncConfig.setIODevices(new String[] { ioDev.getAbsolutePath() });
asterixNCs[i] = new NodeControllerService(ncConfig);
asterixNCs[i].start();
}
@@ -138,7 +138,7 @@
hcc.cancelJob(jobId);
}
- protected void runTest(JobSpecification spec) throws Exception {
+ protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(spec.toJSON().asText());
}
@@ -180,14 +180,29 @@
try {
bbis.close();
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
-
readSize = reader.read(resultFrame);
}
}
- hcc.waitForCompletion(jobId);
+ boolean expectedExceptionThrown = false;
+ try {
+ hcc.waitForCompletion(jobId);
+ } catch (HyracksDataException hde) {
+ if (expectedErrorMessage != null) {
+ if (hde.toString().contains(expectedErrorMessage)) {
+ expectedExceptionThrown = true;
+ } else {
+ throw hde;
+ }
+ } else {
+ throw hde;
+ }
+ }
+ if (expectedErrorMessage != null && !expectedExceptionThrown) {
+ throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown");
+ }
dumpOutputFiles();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 871109a..13a103e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -48,7 +48,7 @@
spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
spec.addRoot(sinkOpDesc);
try {
- runTest(spec);
+ runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
} catch (Exception e) {
e.printStackTrace();
throw e;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index e92113a..67642f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -69,15 +69,15 @@
public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], "data" + File.separator + "tpch0.001"
- + File.separator + "lineitem.tbl"),
- new ManagedFileSplit(ASTERIX_IDS[1], "data" + File.separator + "tpch0.001" + File.separator
- + "lineitem.tbl"),
- new ManagedFileSplit(ASTERIX_IDS[2], "data" + File.separator + "tpch0.001" + File.separator
- + "lineitem.tbl"),
- new ManagedFileSplit(ASTERIX_IDS[3], "data" + File.separator + "tpch0.001" + File.separator
- + "lineitem.tbl") });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new ManagedFileSplit(ASTERIX_IDS[0],
+ "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+ new ManagedFileSplit(ASTERIX_IDS[1],
+ "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+ new ManagedFileSplit(ASTERIX_IDS[2],
+ "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"),
+ new ManagedFileSplit(ASTERIX_IDS[3],
+ "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl") });
final int fileSize = 800 * 1024 * 4;
@@ -112,8 +112,8 @@
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner =
+ new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
"asterix-003", "asterix-004");
@@ -163,7 +163,7 @@
spec.connect(conn2, grouper, 0, printer, 0);
spec.addRoot(printer);
- runTest(spec);
+ runTest(spec, null);
}
/**
@@ -177,8 +177,8 @@
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner =
+ new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
"asterix-003", "asterix-004");
@@ -221,7 +221,7 @@
spec.connect(conn2, grouper, 0, printer, 0);
spec.addRoot(printer);
- runTest(spec);
+ runTest(spec, null);
}
private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
index f814cd5..d704671 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -38,6 +38,7 @@
private static AtomicInteger createPushRuntime = new AtomicInteger();
private static AtomicInteger initializeCounter = new AtomicInteger();
private static AtomicInteger openCloseCounter = new AtomicInteger();
+ public static final String ERROR_MESSAGE = "I throw exceptions";
private final int[] exceptionPartitions;
private final boolean sleepOnInitialize;
@@ -56,7 +57,7 @@
if (exceptionPartitions != null) {
for (int p : exceptionPartitions) {
if (p == partition) {
- throw new HyracksDataException("I throw exceptions");
+ throw new HyracksDataException(ERROR_MESSAGE);
}
}
}