Optimized thread usage in CC. Reduced sizes of critical sections in PartitionCollector
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1067 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 9a443d9..ba3b41c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.cc.scheduler;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -23,7 +22,6 @@
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -435,31 +433,26 @@
}
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
- Executor executor = ccs.getExecutor();
final JobId jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
- for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
- String nodeId = e.getKey();
- final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
+ for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+ String nodeId = entry.getKey();
+ final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
final NodeControllerState node = ccs.getNodeMap().get(nodeId);
if (node != null) {
node.getActiveJobIds().add(jobRun.getJobId());
jobRun.getParticipatingNodeIds().add(nodeId);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors, connectorPolicies);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
+ }
+ try {
+ node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+ taskDescriptors, connectorPolicies);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
}
@@ -495,21 +488,18 @@
}
final JobId jobId = jobRun.getJobId();
LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
- for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
- final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
- final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+ for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
+ final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
+ final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
if (node != null) {
- LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + e.getKey());
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- node.getNodeController().abortTasks(jobId, abortTaskAttempts);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+ }
+ try {
+ node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
index 15ba75c..af89489 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.common.job;
import java.io.Serializable;
+import java.util.Arrays;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
@@ -51,4 +52,11 @@
public int[] getOutputPartitionCounts() {
return nOutputPartitions;
}
+
+ @Override
+ public String toString() {
+ return "TaskAttemptDescriptor[taId = " + taId + ", nPartitions = " + nPartitions + ", nInputPartitions = "
+ + Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
+ + "]";
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 07d8ad7..cfa0aaa 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
-import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
@@ -140,10 +139,6 @@
}
}
- public Executor getExecutor() {
- return nodeController.getExecutor();
- }
-
public synchronized void notifyTaskComplete(Task task) throws Exception {
taskMap.remove(task.getTaskAttemptId());
try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 68061f1..daeb4d8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -183,7 +183,7 @@
public void start() throws HyracksException {
aborted = false;
- joblet.getExecutor().execute(this);
+ executor.execute(this);
}
public synchronized void abort() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7bebb53..dce7c13 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,15 +50,6 @@
if (joblet != null) {
joblet.cleanup(status);
}
- ncs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
+ ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index d5f4fd8..af2670b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -172,44 +172,42 @@
@Override
public void notifyFailure(IInputChannel channel) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
+ }
synchronized (NonDeterministicPartitionCollector.this) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
failSenders.set(senderIndex);
eosSenders.set(senderIndex);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: "
- + receiverIndex);
- }
NonDeterministicPartitionCollector.this.notifyAll();
}
}
@Override
public void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
+ + receiverIndex);
+ }
synchronized (NonDeterministicPartitionCollector.this) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
availableFrameCounts[senderIndex] += nFrames;
frameAvailability.set(senderIndex);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
- + receiverIndex);
- }
NonDeterministicPartitionCollector.this.notifyAll();
}
}
@Override
public void notifyEndOfStream(IInputChannel channel) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
+ }
synchronized (NonDeterministicPartitionCollector.this) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
eosSenders.set(senderIndex);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: "
- + receiverIndex);
- }
NonDeterministicPartitionCollector.this.notifyAll();
}
}