Set Job Scheduler log level to FINE
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1003 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 5e11470..9a443d9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -184,8 +184,8 @@
private void startRunnableActivityClusters() throws HyracksException {
Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+ inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
@@ -213,19 +213,19 @@
queue.add(new RankedRunnableTaskCluster(priority, tc));
}
}
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ranked TCs: " + queue);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Ranked TCs: " + queue);
}
Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
for (RankedRunnableTaskCluster rrtc : queue) {
TaskCluster tc = rrtc.getTaskCluster();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Found runnable TC: " + tc);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Found runnable TC: " + tc);
List<TaskClusterAttempt> attempts = tc.getAttempts();
- LOGGER.info("Attempts so far:" + attempts.size());
+ LOGGER.fine("Attempts so far:" + attempts.size());
for (TaskClusterAttempt tcAttempt : attempts) {
- LOGGER.info("Status: " + tcAttempt.getStatus());
+ LOGGER.fine("Status: " + tcAttempt.getStatus());
}
}
assignTaskLocations(tc, taskAttemptMap);
@@ -245,16 +245,16 @@
* Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _}
*/
private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Computing runnability: " + goal);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Computing runnability: " + goal);
}
if (runnabilityMap.containsKey(goal)) {
return runnabilityMap.get(goal);
}
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
if (lastAttempt != null) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Last Attempt Status: " + lastAttempt.getStatus());
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
}
if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
@@ -271,15 +271,15 @@
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
for (PartitionId pid : goal.getRequiredPartitions()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inspecting required partition: " + pid);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Inspecting required partition: " + pid);
}
Runnability runnability;
ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
PartitionState maxState = pmm.getMaximumAvailableState(pid);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Policy: " + cPolicy + " maxState: " + maxState);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
}
if (PartitionState.COMMITTED.equals(maxState)) {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
@@ -313,8 +313,8 @@
// already not runnable -- cannot get better. bail.
break;
}
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("aggregateRunnability: " + aggregateRunnability);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
}
}
runnabilityMap.put(goal, aggregateRunnability);
@@ -474,14 +474,14 @@
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
- LOGGER.info("Aborting task cluster: " + tcAttempt.getAttempt());
+ LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
TaskAttemptId taId = ta.getTaskAttemptId();
TaskAttempt.TaskStatus status = ta.getStatus();
abortTaskIds.add(taId);
- LOGGER.info("Checking " + taId + ": " + ta.getStatus());
+ LOGGER.fine("Checking " + taId + ": " + ta.getStatus());
if (status == TaskAttempt.TaskStatus.RUNNING || status == TaskAttempt.TaskStatus.COMPLETED) {
ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
ta.setEndTime(System.currentTimeMillis());
@@ -494,12 +494,12 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+ LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
final List<TaskAttemptId> abortTaskAttempts = e.getValue();
if (node != null) {
- LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+ LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + e.getKey());
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
@@ -613,12 +613,12 @@
*/
public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
try {
- LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
+ LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
- LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+ LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
ta.setStatus(TaskAttempt.TaskStatus.FAILED, details);
abortTaskCluster(lastAttempt);
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
index 6052443..c09f7b4 100644
--- a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -19,6 +19,9 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
import org.junit.Test;
@@ -32,17 +35,13 @@
public class NetTest {
@Test
public void test() throws Exception {
- MuxDemux md1 = createMuxDemux("md1");
- md1.start();
- InetSocketAddress md1Address = md1.getLocalAddress();
- System.err.println("md1Address: " + md1Address);
+ AtomicBoolean failFlag = new AtomicBoolean();
- MuxDemux md2 = createMuxDemux("md2");
+ MuxDemux md1 = createMuxDemux("md1", failFlag);
+ md1.start();
+ MuxDemux md2 = createMuxDemux("md2", failFlag);
md2.start();
InetSocketAddress md2Address = md2.getLocalAddress();
- System.err.println("md2Address: " + md2Address);
-
- System.err.println("Started");
MultiplexedConnection md1md2 = md1.connect(md2Address);
@@ -53,6 +52,8 @@
t1.join();
t2.join();
+
+ Assert.assertFalse("Failure flag was set to true", failFlag.get());
}
private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
@@ -104,12 +105,11 @@
}
- private MuxDemux createMuxDemux(final String label) {
+ private MuxDemux createMuxDemux(final String label, final AtomicBoolean failFlag) {
IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
@Override
public void channelOpened(final ChannelControlBlock channel) {
final ChannelIO cio = new ChannelIO(label, channel);
- System.err.println(label + ": Channel Opened");
channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
@@ -118,6 +118,8 @@
rieba.accept(ByteBuffer.allocate(1024));
}
new Thread() {
+ private int prevTotal = 0;
+
@Override
public void run() {
while (true) {
@@ -136,7 +138,6 @@
throw new RuntimeException("Error: " + cio.ecode);
} else if (cio.eos) {
channel.getWriteInterface().getFullBufferAcceptor().close();
- System.err.println("Channel Closed");
return;
}
}
@@ -144,9 +145,14 @@
while (fbuf.remaining() > 0) {
counter += fbuf.getInt();
}
+ if (prevTotal != 0) {
+ if (Math.abs(counter - prevTotal) != 256) {
+ failFlag.set(true);
+ }
+ }
+ prevTotal = counter;
fbuf.compact();
rieba.accept(fbuf);
- System.err.println("Received: total: " + counter);
}
}
}.start();