[ASTERIXDB-1982][FAIL] Unify runtime error reporting
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Unify error reporting in result distribution and operator pipeline;
The fact that ASTERIXDB-1982 is sporadic is because the order of
DatasetDirectoryService.reportJobFailure(...) and
DatasetDirectoryService.reportResultPartitionFailure(...)
is not deterministic and the latter can override the former;
- Make the order of setException and addWaiter irrelevant to
avoid sporadically slipped exception;
- Fix the exception list in Task to be thread-safe.
Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1883
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index f79ce53..3119ddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
public interface IDatasetPartitionManager extends IDatasetManager {
@@ -34,8 +33,6 @@
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
throws HyracksException;
- public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
-
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
throws HyracksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 7fdf106..def4c83 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -213,9 +214,9 @@
cancelTasks(tasks, startSemaphore, completeSemaphore);
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
- } catch (Exception e) {
+ } catch (ExecutionException e) {
cancelTasks(tasks, startSemaphore, completeSemaphore);
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(e.getCause());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 53d7620..350984c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,7 +36,6 @@
import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import org.apache.hyracks.control.cc.work.TaskCompleteWork;
import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -130,12 +129,6 @@
ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
break;
- case REPORT_RESULT_PARTITION_FAILURE:
- CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
- (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs,
- rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition()));
- break;
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction rsf =
(CCNCFunctions.SendApplicationMessageFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..8401fcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -149,16 +149,6 @@
}
@Override
- public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- if (djr != null) {
- djr.fail(rsId, partition);
- }
- jobResultLocations.get(jobId).setException(new Exception());
- notifyAll();
- }
-
- @Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr != null) {
@@ -270,6 +260,7 @@
private DatasetJobRecord record;
private Waiters waiters;
+ private Exception exception;
JobResultInfo(DatasetJobRecord record, Waiters waiters) {
this.record = record;
@@ -286,6 +277,10 @@
waiters = new Waiters();
}
waiters.put(rsId, new Waiter(knownRecords, callback));
+ if (exception != null) {
+ // Exception was set before the waiter is added.
+ setException(exception);
+ }
}
Waiter removeWaiter(ResultSetId rsId) {
@@ -302,6 +297,8 @@
waiters.remove(rsId).callback.setException(exception);
}
}
+ // Caches the exception anyway for future added waiters.
+ this.exception = exception;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 663a53a..68d6c16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -41,8 +41,6 @@
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
throws HyracksDataException;
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
-
public void reportJobFailure(JobId jobId, List<Exception> exceptions);
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 2de89db..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 4159594..ec8e045 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -69,8 +69,6 @@
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
public void getNodeControllerInfos() throws Exception;
public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 620033c..d42c4a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -83,7 +83,6 @@
REGISTER_PARTITION_REQUEST,
REGISTER_RESULT_PARTITION_LOCATION,
REPORT_RESULT_PARTITION_WRITE_COMPLETION,
- REPORT_RESULT_PARTITION_FAILURE,
NODE_REGISTRATION_RESULT,
START_TASKS,
@@ -640,39 +639,6 @@
}
}
- public static class ReportResultPartitionFailureFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public int getPartition() {
- return partition;
- }
- }
-
public static class NodeRegistrationResult extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 31cb855..4707487 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -150,13 +150,6 @@
}
@Override
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
- ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction(
- jobId, rsId, partition);
- ensureIpcHandle().send(-1, fn, null);
- }
-
- @Override
public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction(
jobId, nodeId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index ad4881a..74a628d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -22,13 +22,13 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
@@ -121,7 +121,7 @@
opEnv = joblet.getEnvironment();
partitionSendProfile = new Hashtable<>();
pendingThreads = new LinkedHashSet<>();
- exceptions = new ArrayList<>();
+ exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list.
this.ncs = ncs;
this.inputChannelsFromConnectors = inputChannelsFromConnectors;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index c7b563b..361ee37 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -111,17 +111,6 @@
}
@Override
- public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
- try {
- LOGGER.info("Reporting partition failure: JobId: " + jobId + " ResultSetId: " + rsId + " partition: "
- + partition);
- ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
IFrameWriter writer) throws HyracksException {
ResultState resultState = getResultState(jobId, resultSetId, partition);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9ca2d68..81f5551 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -103,7 +103,6 @@
resultState.closeAndDelete();
resultState.abort();
registerResultPartitionLocation(false);
- manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}