Added profile reporting capability
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@169 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
index f965278..7cdaee1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
@@ -29,6 +29,9 @@
@Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
public int maxHeartbeatLapsePeriods = 5;
+ @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
+ public int profileDumpPeriod = 0;
+
@Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
public boolean useJOL = false;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
index 3b6e8aa..2b78fbb 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.control;
import java.rmi.Remote;
+import java.util.Map;
import java.util.UUID;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
@@ -30,4 +31,6 @@
public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
public void nodeHeartbeat(String id) throws Exception;
+
+ public void reportProfile(String id, Map<String, Long> counterDump) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
index 8d1920f..64f124b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NCConfig.java
@@ -18,7 +18,9 @@
import org.kohsuke.args4j.Option;
-public class NCConfig implements Serializable{
+public class NCConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
@Option(name = "-cc-host", usage = "Cluster Controller host name")
public String ccHost;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
index f21a375..8745b32 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/NodeParameters.java
@@ -25,6 +25,8 @@
private int heartbeatPeriod;
+ private int profileDumpPeriod;
+
public ClusterControllerInfo getClusterControllerInfo() {
return ccInfo;
}
@@ -40,4 +42,12 @@
public void setHeartbeatPeriod(int heartbeatPeriod) {
this.heartbeatPeriod = heartbeatPeriod;
}
+
+ public int getProfileDumpPeriod() {
+ return profileDumpPeriod;
+ }
+
+ public void setProfileDumpPeriod(int profileDumpPeriod) {
+ this.profileDumpPeriod = profileDumpPeriod;
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
index 78d8875..1ca0ac7 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -14,11 +14,9 @@
*/
package edu.uci.ics.hyracks.api.job.profiling.counters;
-import java.io.Serializable;
-
-public interface ICounter extends Serializable {
+public interface ICounter {
/**
- * Get the name of the counter.
+ * Get the fully-qualified name of the counter.
*
* @return Name of the counter.
*/
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
deleted file mode 100644
index c7597ee..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc;
-
-import org.kohsuke.args4j.Option;
-
-public class CCConfig {
- @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
- public int port = 1099;
-
- @Option(name = "-http-port", usage = "Sets the http port for the admin console")
- public int httpPort;
-
- @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
- public int heartbeatPeriod = 10000;
-
- @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
- public int maxHeartbeatLapsePeriods = 5;
-
- @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
- public boolean useJOL = false;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
index 8104362..3e36a37 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
@@ -16,6 +16,7 @@
import org.kohsuke.args4j.CmdLineParser;
+import edu.uci.ics.hyracks.api.control.CCConfig;
public class CCDriver {
public static void main(String args[]) throws Exception {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 212524d..723d767 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -59,6 +59,7 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.control.CCConfig;
import edu.uci.ics.hyracks.api.control.IClusterController;
import edu.uci.ics.hyracks.api.control.INodeController;
import edu.uci.ics.hyracks.api.control.NodeParameters;
@@ -165,6 +166,7 @@
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(info);
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+ params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
return params;
}
@@ -226,6 +228,12 @@
return jobManager.waitForCompletion(jobId);
}
+ @Override
+ public void reportProfile(String id, Map<String, Long> counterDump) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO))
+ LOGGER.info("Profile: " + id + ": " + counterDump);
+ }
+
private Handler getAdminConsoleHandler() {
ContextHandler handler = new ContextHandler("/admin");
handler.setHandler(new AbstractHandler() {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
index 440cb0d..8529814 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
public class Counter implements ICounter {
- private static final long serialVersionUID = 1L;
-
private final String name;
private final AtomicLong counter;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index cb0128d..f83a3c1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -15,7 +15,9 @@
package edu.uci.ics.hyracks.control.nc;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -101,4 +103,20 @@
public NodeControllerService getNodeController() {
return nodeController;
}
+
+ public void dumpProfile(Map<String, Long> counterDump) {
+ Set<UUID> stageIds;
+ synchronized (this) {
+ stageIds = new HashSet<UUID>(stageletMap.keySet());
+ }
+ for (UUID stageId : stageIds) {
+ Stagelet si;
+ synchronized (this) {
+ si = stageletMap.get(stageId);
+ }
+ if (si != null) {
+ si.dumpProfile(counterDump);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 77c0271..d9c5cda 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,12 +24,15 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -139,6 +142,11 @@
// Schedule heartbeat generator.
timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+ if (nodeParameters.getProfileDumpPeriod() > 0) {
+ // Schedule profile dump generator.
+ timer.schedule(new ProfileDumpTask(cc), 0, nodeParameters.getProfileDumpPeriod());
+ }
+
LOGGER.log(Level.INFO, "Started NodeControllerService");
}
@@ -501,6 +509,37 @@
}
}
+ private class ProfileDumpTask extends TimerTask {
+ private IClusterController cc;
+
+ public ProfileDumpTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map<String, Long> counterDump = new TreeMap<String, Long>();
+ Set<UUID> jobIds;
+ synchronized (NodeControllerService.this) {
+ jobIds = new HashSet<UUID>(jobletMap.keySet());
+ }
+ for(UUID jobId : jobIds) {
+ Joblet ji;
+ synchronized (NodeControllerService.this) {
+ ji = jobletMap.get(jobId);
+ }
+ if (ji != null) {
+ ji.dumpProfile(counterDump);
+ }
+ }
+ cc.reportProfile(id, counterDump);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
@Override
public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
Joblet ji = jobletMap.get(jobId);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 9b37aff..afbb95d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -172,4 +172,8 @@
public StageletStatistics getStatistics() {
return stats;
}
+
+ public void dumpProfile(Map<String, Long> counterDump) {
+ stageletCounterContext.dump(counterDump);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
index b883683..401dd57 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
@@ -22,26 +22,32 @@
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
public class CounterContext implements ICounterContext {
- private final String name;
+ private final String contextName;
private final Map<String, Counter> counterMap;
public CounterContext(String name) {
- this.name = name;
+ this.contextName = name;
counterMap = new HashMap<String, Counter>();
}
@Override
- public synchronized ICounter getCounter(String name, boolean create) {
- Counter counter = counterMap.get(name);
+ public synchronized ICounter getCounter(String counterName, boolean create) {
+ Counter counter = counterMap.get(counterName);
if (counter == null && create) {
- counter = new Counter(name);
- counterMap.put(name, counter);
+ counter = new Counter(contextName + "." + counterName);
+ counterMap.put(counterName, counter);
}
return counter;
}
@Override
public String getContextName() {
- return name;
+ return contextName;
+ }
+
+ public synchronized void dump(Map<String, Long> dumpMap) {
+ for (Counter c : counterMap.values()) {
+ dumpMap.put(c.getName(), c.get());
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 077c917..c0b6d34 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -22,10 +22,10 @@
import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.control.CCConfig;
import edu.uci.ics.hyracks.api.control.NCConfig;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.cc.CCConfig;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;