Optimized thread usage in CC. Reduced sizes of critical sections in PartitionCollector

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1067 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 9a443d9..ba3b41c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -23,7 +22,6 @@
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -435,31 +433,26 @@
     }
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
-        Executor executor = ccs.getExecutor();
         final JobId jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
-            String nodeId = e.getKey();
-            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+            String nodeId = entry.getKey();
+            final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
                 node.getActiveJobIds().add(jobRun.getJobId());
                 jobRun.getParticipatingNodeIds().add(nodeId);
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors, connectorPolicies);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
+                }
+                try {
+                    node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+                            taskDescriptors, connectorPolicies);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
             }
         }
     }
@@ -495,21 +488,18 @@
         }
         final JobId jobId = jobRun.getJobId();
         LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
-        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
-            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
-            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+        for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
             if (node != null) {
-                LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + e.getKey());
-                ccs.getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+                }
+                try {
+                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
             }
         }
         inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
index 15ba75c..af89489 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.common.job;
 
 import java.io.Serializable;
+import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 
@@ -51,4 +52,11 @@
     public int[] getOutputPartitionCounts() {
         return nOutputPartitions;
     }
+
+    @Override
+    public String toString() {
+        return "TaskAttemptDescriptor[taId = " + taId + ", nPartitions = " + nPartitions + ", nInputPartitions = "
+                + Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
+                + "]";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 07d8ad7..cfa0aaa 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -20,7 +20,6 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
-import java.util.concurrent.Executor;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
@@ -140,10 +139,6 @@
         }
     }
 
-    public Executor getExecutor() {
-        return nodeController.getExecutor();
-    }
-
     public synchronized void notifyTaskComplete(Task task) throws Exception {
         taskMap.remove(task.getTaskAttemptId());
         try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 68061f1..daeb4d8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -183,7 +183,7 @@
 
     public void start() throws HyracksException {
         aborted = false;
-        joblet.getExecutor().execute(this);
+        executor.execute(this);
     }
 
     public synchronized void abort() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7bebb53..dce7c13 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,15 +50,6 @@
         if (joblet != null) {
             joblet.cleanup(status);
         }
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index d5f4fd8..af2670b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -172,44 +172,42 @@
 
         @Override
         public void notifyFailure(IInputChannel channel) {
+            PartitionId pid = (PartitionId) channel.getAttachment();
+            int senderIndex = pid.getSenderIndex();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
+            }
             synchronized (NonDeterministicPartitionCollector.this) {
-                PartitionId pid = (PartitionId) channel.getAttachment();
-                int senderIndex = pid.getSenderIndex();
                 failSenders.set(senderIndex);
                 eosSenders.set(senderIndex);
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: "
-                            + receiverIndex);
-                }
                 NonDeterministicPartitionCollector.this.notifyAll();
             }
         }
 
         @Override
         public void notifyDataAvailability(IInputChannel channel, int nFrames) {
+            PartitionId pid = (PartitionId) channel.getAttachment();
+            int senderIndex = pid.getSenderIndex();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
+                        + receiverIndex);
+            }
             synchronized (NonDeterministicPartitionCollector.this) {
-                PartitionId pid = (PartitionId) channel.getAttachment();
-                int senderIndex = pid.getSenderIndex();
                 availableFrameCounts[senderIndex] += nFrames;
                 frameAvailability.set(senderIndex);
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
-                            + receiverIndex);
-                }
                 NonDeterministicPartitionCollector.this.notifyAll();
             }
         }
 
         @Override
         public void notifyEndOfStream(IInputChannel channel) {
+            PartitionId pid = (PartitionId) channel.getAttachment();
+            int senderIndex = pid.getSenderIndex();
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
+            }
             synchronized (NonDeterministicPartitionCollector.this) {
-                PartitionId pid = (PartitionId) channel.getAttachment();
-                int senderIndex = pid.getSenderIndex();
                 eosSenders.set(senderIndex);
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: "
-                            + receiverIndex);
-                }
                 NonDeterministicPartitionCollector.this.notifyAll();
             }
         }