Added profile reporting capability

git-svn-id: https://hyracks.googlecode.com/svn/trunk@169 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
index f965278..7cdaee1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
@@ -29,6 +29,9 @@
     @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
     public int maxHeartbeatLapsePeriods = 5;
 
+    @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
+    public int profileDumpPeriod = 0;
+
     @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
     public boolean useJOL = false;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
index 3b6e8aa..2b78fbb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.control;
 
 import java.rmi.Remote;
+import java.util.Map;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
@@ -30,4 +31,6 @@
     public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
 
     public void nodeHeartbeat(String id) throws Exception;
+
+    public void reportProfile(String id, Map<String, Long> counterDump) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
index 8d1920f..64f124b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
@@ -18,7 +18,9 @@
 
 import org.kohsuke.args4j.Option;
 
-public class NCConfig implements Serializable{
+public class NCConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
     @Option(name = "-cc-host", usage = "Cluster Controller host name")
     public String ccHost;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
index f21a375..8745b32 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
@@ -25,6 +25,8 @@
 
     private int heartbeatPeriod;
 
+    private int profileDumpPeriod;
+
     public ClusterControllerInfo getClusterControllerInfo() {
         return ccInfo;
     }
@@ -40,4 +42,12 @@
     public void setHeartbeatPeriod(int heartbeatPeriod) {
         this.heartbeatPeriod = heartbeatPeriod;
     }
+
+    public int getProfileDumpPeriod() {
+        return profileDumpPeriod;
+    }
+
+    public void setProfileDumpPeriod(int profileDumpPeriod) {
+        this.profileDumpPeriod = profileDumpPeriod;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
index 78d8875..1ca0ac7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -14,11 +14,9 @@
  */
 package edu.uci.ics.hyracks.api.job.profiling.counters;
 
-import java.io.Serializable;
-
-public interface ICounter extends Serializable {
+public interface ICounter {
     /**
-     * Get the name of the counter.
+     * Get the fully-qualified name of the counter.
      * 
      * @return Name of the counter.
      */
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
deleted file mode 100644
index c7597ee..0000000
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc;
-
-import org.kohsuke.args4j.Option;
-
-public class CCConfig {
-    @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
-    public int port = 1099;
-
-    @Option(name = "-http-port", usage = "Sets the http port for the admin console")
-    public int httpPort;
-
-    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
-    public int heartbeatPeriod = 10000;
-
-    @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
-    public int maxHeartbeatLapsePeriods = 5;
-
-    @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
-    public boolean useJOL = false;
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
index 8104362..3e36a37 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
@@ -16,6 +16,7 @@
 
 import org.kohsuke.args4j.CmdLineParser;
 
+import edu.uci.ics.hyracks.api.control.CCConfig;
 
 public class CCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 212524d..723d767 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -59,6 +59,7 @@
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
 import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.control.CCConfig;
 import edu.uci.ics.hyracks.api.control.IClusterController;
 import edu.uci.ics.hyracks.api.control.INodeController;
 import edu.uci.ics.hyracks.api.control.NodeParameters;
@@ -165,6 +166,7 @@
         NodeParameters params = new NodeParameters();
         params.setClusterControllerInfo(info);
         params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+        params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
         return params;
     }
 
@@ -226,6 +228,12 @@
         return jobManager.waitForCompletion(jobId);
     }
 
+    @Override
+    public void reportProfile(String id, Map<String, Long> counterDump) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO))
+            LOGGER.info("Profile: " + id + ": " + counterDump);
+    }
+
     private Handler getAdminConsoleHandler() {
         ContextHandler handler = new ContextHandler("/admin");
         handler.setHandler(new AbstractHandler() {
diff --git a/hyracks/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
index 440cb0d..8529814 100644
--- a/hyracks/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
+++ b/hyracks/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 
 public class Counter implements ICounter {
-    private static final long serialVersionUID = 1L;
-
     private final String name;
     private final AtomicLong counter;
 
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index cb0128d..f83a3c1 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -15,7 +15,9 @@
 package edu.uci.ics.hyracks.control.nc;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
@@ -101,4 +103,20 @@
     public NodeControllerService getNodeController() {
         return nodeController;
     }
+
+    public void dumpProfile(Map<String, Long> counterDump) {
+        Set<UUID> stageIds;
+        synchronized (this) {
+            stageIds = new HashSet<UUID>(stageletMap.keySet());
+        }
+        for (UUID stageId : stageIds) {
+            Stagelet si;
+            synchronized (this) {
+                si = stageletMap.get(stageId);
+            }
+            if (si != null) {
+                si.dumpProfile(counterDump);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 77c0271..d9c5cda 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,12 +24,15 @@
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -139,6 +142,11 @@
         // Schedule heartbeat generator.
         timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
 
+        if (nodeParameters.getProfileDumpPeriod() > 0) {
+            // Schedule profile dump generator.
+            timer.schedule(new ProfileDumpTask(cc), 0, nodeParameters.getProfileDumpPeriod());
+        }
+
         LOGGER.log(Level.INFO, "Started NodeControllerService");
     }
 
@@ -501,6 +509,37 @@
         }
     }
 
+    private class ProfileDumpTask extends TimerTask {
+        private IClusterController cc;
+
+        public ProfileDumpTask(IClusterController cc) {
+            this.cc = cc;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Map<String, Long> counterDump = new TreeMap<String, Long>();
+                Set<UUID> jobIds;
+                synchronized (NodeControllerService.this) {
+                    jobIds = new HashSet<UUID>(jobletMap.keySet());
+                }
+                for(UUID jobId : jobIds) {
+                    Joblet ji;
+                    synchronized (NodeControllerService.this) {
+                        ji = jobletMap.get(jobId);
+                    }
+                    if (ji != null) {
+                        ji.dumpProfile(counterDump);
+                    }
+                }
+                cc.reportProfile(id, counterDump);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
     @Override
     public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
         Joblet ji = jobletMap.get(jobId);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 9b37aff..afbb95d 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -172,4 +172,8 @@
     public StageletStatistics getStatistics() {
         return stats;
     }
+
+    public void dumpProfile(Map<String, Long> counterDump) {
+        stageletCounterContext.dump(counterDump);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
index b883683..401dd57 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
@@ -22,26 +22,32 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 
 public class CounterContext implements ICounterContext {
-    private final String name;
+    private final String contextName;
     private final Map<String, Counter> counterMap;
 
     public CounterContext(String name) {
-        this.name = name;
+        this.contextName = name;
         counterMap = new HashMap<String, Counter>();
     }
 
     @Override
-    public synchronized ICounter getCounter(String name, boolean create) {
-        Counter counter = counterMap.get(name);
+    public synchronized ICounter getCounter(String counterName, boolean create) {
+        Counter counter = counterMap.get(counterName);
         if (counter == null && create) {
-            counter = new Counter(name);
-            counterMap.put(name, counter);
+            counter = new Counter(contextName + "." + counterName);
+            counterMap.put(counterName, counter);
         }
         return counter;
     }
 
     @Override
     public String getContextName() {
-        return name;
+        return contextName;
+    }
+
+    public synchronized void dump(Map<String, Long> dumpMap) {
+        for (Counter c : counterMap.values()) {
+            dumpMap.put(c.getName(), c.get());
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 077c917..c0b6d34 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -22,10 +22,10 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.control.CCConfig;
 import edu.uci.ics.hyracks.api.control.NCConfig;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.cc.CCConfig;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;