Set Job Scheduler log level to FINE

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1003 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 5e11470..9a443d9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -184,8 +184,8 @@
     private void startRunnableActivityClusters() throws HyracksException {
         Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
         findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
                     + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
@@ -213,19 +213,19 @@
                 queue.add(new RankedRunnableTaskCluster(priority, tc));
             }
         }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ranked TCs: " + queue);
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Ranked TCs: " + queue);
         }
 
         Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
         for (RankedRunnableTaskCluster rrtc : queue) {
             TaskCluster tc = rrtc.getTaskCluster();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Found runnable TC: " + tc);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Found runnable TC: " + tc);
                 List<TaskClusterAttempt> attempts = tc.getAttempts();
-                LOGGER.info("Attempts so far:" + attempts.size());
+                LOGGER.fine("Attempts so far:" + attempts.size());
                 for (TaskClusterAttempt tcAttempt : attempts) {
-                    LOGGER.info("Status: " + tcAttempt.getStatus());
+                    LOGGER.fine("Status: " + tcAttempt.getStatus());
                 }
             }
             assignTaskLocations(tc, taskAttemptMap);
@@ -245,16 +245,16 @@
      * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} 
      */
     private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Computing runnability: " + goal);
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Computing runnability: " + goal);
         }
         if (runnabilityMap.containsKey(goal)) {
             return runnabilityMap.get(goal);
         }
         TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
         if (lastAttempt != null) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Last Attempt Status: " + lastAttempt.getStatus());
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
             }
             if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
                 Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
@@ -271,15 +271,15 @@
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
         Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
         for (PartitionId pid : goal.getRequiredPartitions()) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Inspecting required partition: " + pid);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Inspecting required partition: " + pid);
             }
             Runnability runnability;
             ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
             IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
             PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Policy: " + cPolicy + " maxState: " + maxState);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
             }
             if (PartitionState.COMMITTED.equals(maxState)) {
                 runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
@@ -313,8 +313,8 @@
                 // already not runnable -- cannot get better. bail.
                 break;
             }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("aggregateRunnability: " + aggregateRunnability);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
             }
         }
         runnabilityMap.put(goal, aggregateRunnability);
@@ -474,14 +474,14 @@
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
-        LOGGER.info("Aborting task cluster: " + tcAttempt.getAttempt());
+        LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskAttempt.TaskStatus status = ta.getStatus();
             abortTaskIds.add(taId);
-            LOGGER.info("Checking " + taId + ": " + ta.getStatus());
+            LOGGER.fine("Checking " + taId + ": " + ta.getStatus());
             if (status == TaskAttempt.TaskStatus.RUNNING || status == TaskAttempt.TaskStatus.COMPLETED) {
                 ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
                 ta.setEndTime(System.currentTimeMillis());
@@ -494,12 +494,12 @@
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+        LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
         for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
             final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
             final List<TaskAttemptId> abortTaskAttempts = e.getValue();
             if (node != null) {
-                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+                LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + e.getKey());
                 ccs.getExecutor().execute(new Runnable() {
                     @Override
                     public void run() {
@@ -613,12 +613,12 @@
      */
     public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
         try {
-            LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
+            LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-                LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+                LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, details);
                 abortTaskCluster(lastAttempt);
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
index 6052443..c09f7b4 100644
--- a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -19,6 +19,9 @@
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
 
 import org.junit.Test;
 
@@ -32,17 +35,13 @@
 public class NetTest {
     @Test
     public void test() throws Exception {
-        MuxDemux md1 = createMuxDemux("md1");
-        md1.start();
-        InetSocketAddress md1Address = md1.getLocalAddress();
-        System.err.println("md1Address: " + md1Address);
+        AtomicBoolean failFlag = new AtomicBoolean();
 
-        MuxDemux md2 = createMuxDemux("md2");
+        MuxDemux md1 = createMuxDemux("md1", failFlag);
+        md1.start();
+        MuxDemux md2 = createMuxDemux("md2", failFlag);
         md2.start();
         InetSocketAddress md2Address = md2.getLocalAddress();
-        System.err.println("md2Address: " + md2Address);
-
-        System.err.println("Started");
 
         MultiplexedConnection md1md2 = md1.connect(md2Address);
 
@@ -53,6 +52,8 @@
 
         t1.join();
         t2.join();
+
+        Assert.assertFalse("Failure flag was set to true", failFlag.get());
     }
 
     private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
@@ -104,12 +105,11 @@
 
     }
 
-    private MuxDemux createMuxDemux(final String label) {
+    private MuxDemux createMuxDemux(final String label, final AtomicBoolean failFlag) {
         IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
             @Override
             public void channelOpened(final ChannelControlBlock channel) {
                 final ChannelIO cio = new ChannelIO(label, channel);
-                System.err.println(label + ": Channel Opened");
                 channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
                 channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
 
@@ -118,6 +118,8 @@
                     rieba.accept(ByteBuffer.allocate(1024));
                 }
                 new Thread() {
+                    private int prevTotal = 0;
+
                     @Override
                     public void run() {
                         while (true) {
@@ -136,7 +138,6 @@
                                     throw new RuntimeException("Error: " + cio.ecode);
                                 } else if (cio.eos) {
                                     channel.getWriteInterface().getFullBufferAcceptor().close();
-                                    System.err.println("Channel Closed");
                                     return;
                                 }
                             }
@@ -144,9 +145,14 @@
                             while (fbuf.remaining() > 0) {
                                 counter += fbuf.getInt();
                             }
+                            if (prevTotal != 0) {
+                                if (Math.abs(counter - prevTotal) != 256) {
+                                    failFlag.set(true);
+                                }
+                            }
+                            prevTotal = counter;
                             fbuf.compact();
                             rieba.accept(fbuf);
-                            System.err.println("Received: total: " + counter);
                         }
                     }
                 }.start();