Misc. Cleanup / InterruptedException Handling

Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1775
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 0beee6f..ddba2d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
index 2f3c72b..28e098f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -65,7 +65,7 @@
              * extract start activities
              */
             List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
-            if (conns == null || conns.size() == 0) {
+            if (conns == null || conns.isEmpty()) {
                 startActivities.put(entry.getKey(), entry.getValue());
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index eeaee04..314bf8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.api.rewriter.runtime;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -29,6 +30,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -64,7 +66,7 @@
         this.partition = partition;
         this.nPartitions = nPartitions;
 
-        /**
+        /*
          * initialize the writer-relationship for the internal DAG of operator
          * node pushables
          */
@@ -77,43 +79,39 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        runInParallel(op -> op.initialize());
+        runInParallel(IOperatorNodePushable::initialize);
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        runInParallel(op -> op.deinitialize());
+        runInParallel(IOperatorNodePushable::deinitialize);
     }
 
     private void init() throws HyracksDataException {
-        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>();
         Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
         List<IConnectorDescriptor> outputConnectors;
 
-        /**
+        /*
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable =
-                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-            startOperatorNodePushables.put(entry.getKey(), opPushable);
+            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+                    nPartitions);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
-            outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
-            if (outputConnectors != null) {
-                for (IConnectorDescriptor conn : outputConnectors) {
-                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
-                }
+            outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
+                    Collections.emptyList());
+            for (IConnectorDescriptor conn : outputConnectors) {
+                childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
             }
         }
 
-        /**
-         * Using BFS (breadth-first search) to construct to runtime execution
-         * DAG;
+        /*
+         * Using BFS (breadth-first search) to construct to runtime execution DAG...
          */
-        while (childQueue.size() > 0) {
-            /**
+        while (!childQueue.isEmpty()) {
+            /*
              * construct the source to destination information
              */
             Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
@@ -130,25 +128,23 @@
                 operatorNodePushables.put(destId, destOp);
             }
 
-            /**
+            /*
              * construct the dataflow connection from a producer to a consumer
              */
             sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
                     recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
 
-            /**
+            /*
              * traverse to the child of the current activity
              */
-            outputConnectors = parent.getActivityOutputMap().get(destId);
+            outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), destId, Collections.emptyList());
 
-            /**
+            /*
              * expend the executing activities further to the downstream
              */
-            if (outputConnectors != null && outputConnectors.size() > 0) {
-                for (IConnectorDescriptor conn : outputConnectors) {
-                    if (conn != null) {
-                        childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
-                    }
+            for (IConnectorDescriptor conn : outputConnectors) {
+                if (conn != null) {
+                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
                 }
             }
         }
@@ -162,7 +158,7 @@
     @Override
     public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc)
             throws HyracksDataException {
-        /**
+        /*
          * set the right output frame writer
          */
         Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
@@ -172,7 +168,7 @@
 
     @Override
     public IFrameWriter getInputFrameWriter(final int index) {
-        /**
+        /*
          * get the right IFrameWriter from the cluster input index
          */
         Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
@@ -209,16 +205,24 @@
             for (Future<Void> task : tasks) {
                 task.get();
             }
-        } catch (Exception e) {
-            try {
-                startSemaphore.acquireUninterruptibly();
-                for (Future<Void> task : tasks) {
-                    task.cancel(true);
-                }
-            } finally {
-                completeSemaphore.acquireUninterruptibly();
-            }
+        } catch (InterruptedException e) {
+            cancelTasks(tasks, startSemaphore, completeSemaphore);
+            Thread.currentThread().interrupt();
             throw HyracksDataException.create(e);
+        } catch (Exception e) {
+            cancelTasks(tasks, startSemaphore, completeSemaphore);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) {
+        try {
+            startSemaphore.acquireUninterruptibly();
+            for (Future<Void> task : tasks) {
+                task.cancel(true);
+            }
+        } finally {
+            completeSemaphore.acquireUninterruptibly();
         }
     }
 }