[ASTERIXDB-1982][FAIL] Unify runtime error reporting

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Unify error reporting in result distribution and operator pipeline;
  The fact that ASTERIXDB-1982 is sporadic is because the order of
  DatasetDirectoryService.reportJobFailure(...) and
  DatasetDirectoryService.reportResultPartitionFailure(...)
  is not deterministic and the latter can override the former;
- Make the order of setException and addWaiter irrelevant to
  avoid sporadically slipped exception;
- Fix the exception list in Task to be thread-safe.

Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1883
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index f79ce53..3119ddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,7 +21,6 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager extends IDatasetManager {
@@ -34,8 +33,6 @@
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
             throws HyracksException;
 
-    public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
-
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
             throws HyracksException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 7fdf106..def4c83 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
@@ -213,9 +214,9 @@
             cancelTasks(tasks, startSemaphore, completeSemaphore);
             Thread.currentThread().interrupt();
             throw HyracksDataException.create(e);
-        } catch (Exception e) {
+        } catch (ExecutionException e) {
             cancelTasks(tasks, startSemaphore, completeSemaphore);
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(e.getCause());
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 53d7620..350984c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,7 +36,6 @@
 import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
 import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
 import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -130,12 +129,6 @@
                 ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
                         rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
                 break;
-            case REPORT_RESULT_PARTITION_FAILURE:
-                CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
-                        (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs,
-                        rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition()));
-                break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf =
                         (CCNCFunctions.SendApplicationMessageFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..8401fcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -149,16 +149,6 @@
     }
 
     @Override
-    public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail(rsId, partition);
-        }
-        jobResultLocations.get(jobId).setException(new Exception());
-        notifyAll();
-    }
-
-    @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
         DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr != null) {
@@ -270,6 +260,7 @@
 
     private DatasetJobRecord record;
     private Waiters waiters;
+    private Exception exception;
 
     JobResultInfo(DatasetJobRecord record, Waiters waiters) {
         this.record = record;
@@ -286,6 +277,10 @@
             waiters = new Waiters();
         }
         waiters.put(rsId, new Waiter(knownRecords, callback));
+        if (exception != null) {
+            // Exception was set before the waiter is added.
+            setException(exception);
+        }
     }
 
     Waiter removeWaiter(ResultSetId rsId) {
@@ -302,6 +297,8 @@
                 waiters.remove(rsId).callback.setException(exception);
             }
         }
+        // Caches the exception anyway for future added waiters.
+        this.exception = exception;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 663a53a..68d6c16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -41,8 +41,6 @@
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
             throws HyracksDataException;
 
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
-
     public void reportJobFailure(JobId jobId, List<Exception> exceptions);
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 2de89db..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final int partition;
-
-    public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.partition = partition;
-    }
-
-    @Override
-    public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 4159594..ec8e045 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -69,8 +69,6 @@
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
 
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
     public void getNodeControllerInfos() throws Exception;
 
     public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 620033c..d42c4a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -83,7 +83,6 @@
         REGISTER_PARTITION_REQUEST,
         REGISTER_RESULT_PARTITION_LOCATION,
         REPORT_RESULT_PARTITION_WRITE_COMPLETION,
-        REPORT_RESULT_PARTITION_FAILURE,
 
         NODE_REGISTRATION_RESULT,
         START_TASKS,
@@ -640,39 +639,6 @@
         }
     }
 
-    public static class ReportResultPartitionFailureFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-
-        private final ResultSetId rsId;
-
-        private final int partition;
-
-        public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
-            this.jobId = jobId;
-            this.rsId = rsId;
-            this.partition = partition;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public ResultSetId getResultSetId() {
-            return rsId;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-    }
-
     public static class NodeRegistrationResult extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 31cb855..4707487 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -150,13 +150,6 @@
     }
 
     @Override
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
-        ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction(
-                jobId, rsId, partition);
-        ensureIpcHandle().send(-1, fn, null);
-    }
-
-    @Override
     public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
         ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction(
                 jobId, nodeId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index ad4881a..74a628d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -22,13 +22,13 @@
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
@@ -121,7 +121,7 @@
         opEnv = joblet.getEnvironment();
         partitionSendProfile = new Hashtable<>();
         pendingThreads = new LinkedHashSet<>();
-        exceptions = new ArrayList<>();
+        exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list.
         this.ncs = ncs;
         this.inputChannelsFromConnectors = inputChannelsFromConnectors;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index c7b563b..361ee37 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -111,17 +111,6 @@
     }
 
     @Override
-    public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
-        try {
-            LOGGER.info("Reporting partition failure: JobId: " + jobId + " ResultSetId: " + rsId + " partition: "
-                    + partition);
-            ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
             IFrameWriter writer) throws HyracksException {
         ResultState resultState = getResultState(jobId, resultSetId, partition);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9ca2d68..81f5551 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -103,7 +103,6 @@
             resultState.closeAndDelete();
             resultState.abort();
             registerResultPartitionLocation(false);
-            manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }