ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable

Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1681
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 4c0eb1b..c9cdb2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -20,6 +20,8 @@
 package org.apache.hyracks.api.exceptions;
 
 import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -29,11 +31,17 @@
 public class HyracksDataException extends HyracksException {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(HyracksDataException.class.getName());
 
     public static HyracksDataException create(Throwable cause) {
         if (cause instanceof HyracksDataException || cause == null) {
             return (HyracksDataException) cause;
         }
+        if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
+            LOGGER.log(Level.WARNING,
+                    "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted",
+                    cause);
+        }
         return new HyracksDataException(cause);
     }
 
@@ -46,8 +54,8 @@
     }
 
     public static HyracksDataException create(HyracksDataException e, String nodeId) {
-        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e
-                .getParams());
+        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId,
+                e.getParams());
     }
 
     public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 1c4f916..eeaee04 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,14 +20,14 @@
 package org.apache.hyracks.api.rewriter.runtime;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,12 +43,10 @@
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
  * connected activities in a single thread.
- *
- * @author yingyib
  */
 public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
-    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
-    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>();
+    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
     private final Map<ActivityId, IActivity> startActivities;
     private final SuperActivity parent;
     private final IHyracksTaskContext ctx;
@@ -56,7 +54,6 @@
     private final int partition;
     private final int nPartitions;
     private int inputArity = 0;
-    private boolean[] startedInitialization;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
             IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -80,24 +77,25 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        // Initializes all OperatorNodePushables in parallel and then finally deinitializes them.
-        runInParallel((op, index) -> {
-            startedInitialization[index] = true;
-            op.initialize();
-        });
+        runInParallel(op -> op.initialize());
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        runInParallel(op -> op.deinitialize());
     }
 
     private void init() throws HyracksDataException {
-        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
-        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
-        List<IConnectorDescriptor> outputConnectors = null;
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
+        List<IConnectorDescriptor> outputConnectors;
 
         /**
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
             startOperatorNodePushables.put(entry.getKey(), opPushable);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
@@ -154,19 +152,6 @@
                 }
             }
         }
-
-        // Sets the startedInitialization flags to be false.
-        startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
-        Arrays.fill(startedInitialization, false);
-    }
-
-    @Override
-    public void deinitialize() throws HyracksDataException {
-        runInParallel((op, index) -> {
-            if (startedInitialization[index]) {
-                op.deinitialize();
-            }
-        });
     }
 
     @Override
@@ -192,8 +177,7 @@
          */
         Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
         IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
-        IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
-        return writer;
+        return operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
     }
 
     @Override
@@ -201,31 +185,40 @@
         return "Super Activity " + parent.getActivityMap().values().toString();
     }
 
+    @FunctionalInterface
     interface OperatorNodePushableAction {
-        void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException;
+        void run(IOperatorNodePushable op) throws HyracksDataException;
     }
 
-    private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
-        List<Future<Void>> initializationTasks = new ArrayList<>();
+    private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
+        List<Future<Void>> tasks = new ArrayList<>();
+        final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+        final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         try {
-            int index = 0;
-            // Run one action for all OperatorNodePushables in parallel through a thread pool.
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
-                final int opIndex = index++;
-                initializationTasks.add(ctx.getExecutorService().submit(() -> {
-                    opAction.runAction(op, opIndex);
+                tasks.add(ctx.getExecutorService().submit(() -> {
+                    startSemaphore.release();
+                    try {
+                        action.run(op);
+                    } finally {
+                        completeSemaphore.release();
+                    }
                     return null;
                 }));
             }
-            // Waits until all parallel actions to finish.
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.get();
+            for (Future<Void> task : tasks) {
+                task.get();
             }
         } catch (Exception e) {
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.cancel(true);
+            try {
+                startSemaphore.acquireUninterruptibly();
+                for (Future<Void> task : tasks) {
+                    task.cancel(true);
+                }
+            } finally {
+                completeSemaphore.acquireUninterruptibly();
             }
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }