Fix potential hanging in op.initialize().

- Let initialize()/deinitialize() always pair up.

Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1243
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4ee8303..3e557a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,12 +20,13 @@
 package org.apache.hyracks.api.rewriter.runtime;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
@@ -56,6 +57,7 @@
     private final int partition;
     private final int nPartitions;
     private int inputArity = 0;
+    private boolean[] startedInitialization;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
             IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -79,8 +81,11 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        // Initializes all OperatorNodePushables in parallel.
-        runInParallel(op -> op.initialize());
+        // Initializes all OperatorNodePushables in parallel and then finally deinitializes them.
+        runInParallel((op, index) -> {
+            startedInitialization[index] = true;
+            op.initialize();
+        });
     }
 
     public void init() throws HyracksDataException {
@@ -150,12 +155,19 @@
                 }
             }
         }
+
+        // Sets the startedInitialization flags to be false.
+        startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
+        Arrays.fill(startedInitialization, false);
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        // De-initialize all OperatorNodePushables in parallel.
-        runInParallel(op -> op.deinitialize());
+        runInParallel((op, index) -> {
+            if (startedInitialization[index]) {
+                op.deinitialize();
+            }
+        });
     }
 
     @Override
@@ -191,18 +203,21 @@
     }
 
     interface OperatorNodePushableAction {
-        public void runAction(IOperatorNodePushable op) throws HyracksDataException;
+        void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException;
     }
 
-    private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
-        List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
+    private void runInParallel(OperatorNodePushableAction opAction)
+            throws HyracksDataException {
+        List<Future<Void>> initializationTasks = new ArrayList<>();
         try {
+            int index = 0;
             // Run one action for all OperatorNodePushables in parallel through a thread pool.
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+                final int opIndex = index++;
                 initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        opAction.runAction(op);
+                        opAction.runAction(op, opIndex);
                         return null;
                     }
                 }));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index f463bfa..7c626c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -235,8 +235,12 @@
         }
     }
 
-    private synchronized void addPendingThread(Thread t) {
+    private synchronized boolean addPendingThread(Thread t) {
+        if (aborted) {
+            return false;
+        }
         pendingThreads.add(t);
+        return true;
     }
 
     private synchronized void removePendingThread(Thread t) {
@@ -256,9 +260,16 @@
     public void run() {
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
-        addPendingThread(ct);
+        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+        // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+        // the thread is not escaped from interruption.
+        if (!addPendingThread(ct)) {
+            exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!"));
+            ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+            return;
+        }
         try {
-            ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
             try {
                 operator.initialize();
                 if (collectors.length > 0) {
@@ -271,11 +282,12 @@
                         executorService.execute(new Runnable() {
                             @Override
                             public void run() {
-                                if (aborted) {
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
                                     return;
                                 }
-                                Thread thread = Thread.currentThread();
-                                addPendingThread(thread);
                                 String oldName = thread.getName();
                                 thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
                                 thread.setPriority(Thread.MIN_PRIORITY);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 452da88..b76e458 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -26,13 +26,18 @@
 import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
 import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
 import org.junit.Assert;
+import org.junit.Test;
 
 public class JobFailureTest extends AbstractMultiNCIntegrationTest {
 
-    // commenting out due to intermittent hangs:
-    // https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/2877/artifact/target/threaddumps/jstack_28541.html
-    // @Test
+    @Test
     public void failureOnCreatePushRuntime() throws Exception {
+        for (int round = 0; round < 1000; ++round) {
+            execTest();
+        }
+    }
+
+    private void execTest() throws Exception {
         JobSpecification spec = new JobSpecification();
         AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
                 0, 1, new int[] { 4 }, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
index 8c5bf48..14c644a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -20,6 +20,8 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +34,7 @@
 
 public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
+    private static Logger LOGGER = Logger.getLogger(ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getName());
     private static AtomicInteger createPushRuntime = new AtomicInteger();
     private static AtomicInteger initializeCounter = new AtomicInteger();
     private static AtomicInteger openCloseCounter = new AtomicInteger();
@@ -126,10 +129,10 @@
     public static boolean succeed() {
         boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0;
         if (!success) {
-            System.err.println("Failure:");
-            System.err.println("CreatePushRuntime:" + createPushRuntime.get());
-            System.err.println("InitializeCounter:" + initializeCounter.get());
-            System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+            LOGGER.log(Level.SEVERE, "Failure:");
+            LOGGER.log(Level.SEVERE, "CreatePushRuntime:" + createPushRuntime.get());
+            LOGGER.log(Level.SEVERE, "InitializeCounter:" + initializeCounter.get());
+            LOGGER.log(Level.SEVERE, "OpenCloseCounter:" + openCloseCounter.get());
         }
         return success;
     }