Added a bound on how much historical job information is kept arouund. Added fine grained profiling support to partitions. Made the build a little quieter.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@742 123451ca-8445-de46-9d55-352943316053
diff --git a/config/logging.properties b/config/logging.properties
new file mode 100644
index 0000000..16e6ae0
--- /dev/null
+++ b/config/logging.properties
@@ -0,0 +1,42 @@
+# "handlers" specifies a comma separated list of log Handler
+# classes. These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+#handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers. For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+.level= SEVERE
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+java.util.logging.FileHandler.pattern = %h/java%u.log
+java.util.logging.FileHandler.limit = 50000
+java.util.logging.FileHandler.count = 1
+java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to INFO and above.
+java.util.logging.ConsoleHandler.level = INFO
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+com.xyz.foo.level = SEVERE
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 57714aa..7589509 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -93,7 +93,9 @@
private ClusterControllerInfo info;
- private final Map<JobId, JobRun> runMap;
+ private final Map<JobId, JobRun> activeRunMap;
+
+ private final Map<JobId, JobRun> runMapArchive;
private final WorkQueue workQueue;
@@ -109,7 +111,7 @@
private long jobCounter;
- public ClusterControllerService(CCConfig ccConfig) throws Exception {
+ public ClusterControllerService(final CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
ipAddressNodeNameMap = new HashMap<String, Set<String>>();
@@ -118,7 +120,14 @@
ClusterControllerService.class.getName()));
taskExecutor = Executors.newCachedThreadPool();
webServer = new WebServer(this);
- runMap = new HashMap<JobId, JobRun>();
+ activeRunMap = new HashMap<JobId, JobRun>();
+ runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+ private static final long serialVersionUID = 1L;
+
+ protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
+ return size() > ccConfig.jobHistorySize;
+ }
+ };
workQueue = new WorkQueue();
this.timer = new Timer(true);
ccci = new CCClientInterface(this);
@@ -160,8 +169,12 @@
return applications;
}
- public Map<JobId, JobRun> getRunMap() {
- return runMap;
+ public Map<JobId, JobRun> getActiveRunMap() {
+ return activeRunMap;
+ }
+
+ public Map<JobId, JobRun> getRunMapArchive() {
+ return runMapArchive;
}
public WorkQueue getWorkQueue() {
@@ -222,7 +235,7 @@
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- TaskCompleteWork sce = new TaskCompleteWork(this, jobId, taskId, nodeId);
+ TaskCompleteWork sce = new TaskCompleteWork(this, jobId, taskId, nodeId, statistics);
workQueue.schedule(sce);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/HyracksAdminConsoleApplication.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/HyracksAdminConsoleApplication.java
index 5c033dd..a6113de 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/HyracksAdminConsoleApplication.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/HyracksAdminConsoleApplication.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.adminconsole.pages.IndexPage;
-import edu.uci.ics.hyracks.control.cc.adminconsole.pages.JobsSummaryPage;
-import edu.uci.ics.hyracks.control.cc.adminconsole.pages.NodesSummaryPage;
public class HyracksAdminConsoleApplication extends WebApplication {
private ClusterControllerService ccs;
@@ -28,8 +26,6 @@
@Override
public void init() {
ccs = (ClusterControllerService) getServletContext().getAttribute(ClusterControllerService.class.getName());
- mountPage("jobs", JobsSummaryPage.class);
- mountPage("nodes", NodesSummaryPage.class);
}
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.java
index 904e2d8..3200097 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.java
@@ -14,6 +14,74 @@
*/
package edu.uci.ics.hyracks.control.cc.adminconsole.pages;
+import org.apache.wicket.markup.html.basic.Label;
+import org.apache.wicket.markup.html.link.BookmarkablePageLink;
+import org.apache.wicket.markup.html.list.ListItem;
+import org.apache.wicket.markup.html.list.ListView;
+import org.apache.wicket.request.mapper.parameter.PageParameters;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.web.util.JSONUtils;
+import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
+import edu.uci.ics.hyracks.control.cc.work.GetNodeSummariesJSONWork;
+
public class IndexPage extends AbstractPage {
private static final long serialVersionUID = 1L;
+
+ public IndexPage() throws Exception {
+ ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
+
+ GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
+ ccs.getWorkQueue().scheduleAndSync(gnse);
+ JSONArray nodeSummaries = gnse.getSummaries();
+ add(new Label("node-count", String.valueOf(nodeSummaries.length())));
+ ListView<JSONObject> nodeList = new ListView<JSONObject>("node-list", JSONUtils.toList(nodeSummaries)) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void populateItem(ListItem<JSONObject> item) {
+ JSONObject o = item.getModelObject();
+ try {
+ item.add(new Label("node-id", o.getString("node-id")));
+ item.add(new Label("heap-used", o.getString("heap-used")));
+ item.add(new Label("system-load-average", o.getString("system-load-average")));
+ PageParameters params = new PageParameters();
+ params.add("node-id", o.getString("node-id"));
+ item.add(new BookmarkablePageLink("node-details", NodeDetailsPage.class, params));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ add(nodeList);
+
+ GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
+ ccs.getWorkQueue().scheduleAndSync(gjse);
+ JSONArray jobSummaries = gjse.getSummaries();
+ ListView<JSONObject> jobList = new ListView<JSONObject>("jobs-list", JSONUtils.toList(jobSummaries)) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void populateItem(ListItem<JSONObject> item) {
+ JSONObject o = item.getModelObject();
+ try {
+ item.add(new Label("job-id", o.getString("job-id")));
+ item.add(new Label("application-name", o.getString("application-name")));
+ item.add(new Label("status", o.getString("status")));
+ item.add(new Label("create-time", o.getString("create-time")));
+ item.add(new Label("start-time", o.getString("start-time")));
+ item.add(new Label("end-time", o.getString("end-time")));
+ PageParameters params = new PageParameters();
+ params.add("job-id", o.getString("node-id"));
+ item.add(new BookmarkablePageLink("job-details", JobDetailsPage.class, params));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ add(jobList);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
new file mode 100644
index 0000000..153f90c
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.adminconsole.pages;
+
+import org.apache.wicket.request.mapper.parameter.PageParameters;
+
+public class JobDetailsPage extends AbstractPage {
+ private static final long serialVersionUID = 1L;
+
+ public JobDetailsPage(PageParameters params) throws Exception {
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.java
deleted file mode 100644
index 640501e..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.adminconsole.pages;
-
-import org.apache.wicket.markup.html.basic.Label;
-import org.apache.wicket.markup.html.list.ListItem;
-import org.apache.wicket.markup.html.list.ListView;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.web.util.JSONUtils;
-import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
-
-public class JobsSummaryPage extends AbstractPage {
- private static final long serialVersionUID = 1L;
-
- public JobsSummaryPage() throws Exception {
- ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
-
- GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
- ccs.getWorkQueue().scheduleAndSync(gjse);
- JSONArray summaries = gjse.getSummaries();
- ListView<JSONObject> jobList = new ListView<JSONObject>("job-list", JSONUtils.toList(summaries)) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected void populateItem(ListItem<JSONObject> item) {
- JSONObject o = item.getModelObject();
- try {
- item.add(new Label("job-id", o.getString("job-id")));
- item.add(new Label("application-name", o.getString("application-name")));
- item.add(new Label("status", o.getString("status")));
- item.add(new Label("create-time", o.getString("create-time")));
- item.add(new Label("start-time", o.getString("start-time")));
- item.add(new Label("end-time", o.getString("end-time")));
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
- }
- };
- add(jobList);
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.java
deleted file mode 100644
index 8442bea..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.adminconsole.pages;
-
-import org.apache.wicket.markup.html.basic.Label;
-import org.apache.wicket.markup.html.link.BookmarkablePageLink;
-import org.apache.wicket.markup.html.list.ListItem;
-import org.apache.wicket.markup.html.list.ListView;
-import org.apache.wicket.request.mapper.parameter.PageParameters;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.web.util.JSONUtils;
-import edu.uci.ics.hyracks.control.cc.work.GetNodeSummariesJSONWork;
-
-public class NodesSummaryPage extends AbstractPage {
- private static final long serialVersionUID = 1L;
-
- public NodesSummaryPage() throws Exception {
- ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
-
- GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
- ccs.getWorkQueue().scheduleAndSync(gnse);
- JSONArray summaries = gnse.getSummaries();
- add(new Label("node-count", String.valueOf(summaries.length())));
- ListView<JSONObject> nodeList = new ListView<JSONObject>("node-list", JSONUtils.toList(summaries)) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected void populateItem(ListItem<JSONObject> item) {
- JSONObject o = item.getModelObject();
- try {
- item.add(new Label("node-id", o.getString("node-id")));
- item.add(new Label("heap-used", o.getString("heap-used")));
- item.add(new Label("system-load-average", o.getString("system-load-average")));
- PageParameters params = new PageParameters();
- params.add("node-id", o.getString("node-id"));
- item.add(new BookmarkablePageLink("node-details", NodeDetailsPage.class, params));
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
- }
- };
- add(nodeList);
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index b240da4..a00f4c4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -46,7 +46,7 @@
@Override
public final void run() {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
if (run != null) {
TaskId tid = taId.getTaskId();
Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
index 28823c2..677bece 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
@@ -33,10 +33,13 @@
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
- json = new JSONObject();
- return;
+ run = ccs.getRunMapArchive().get(jobId);
+ if (run == null) {
+ json = new JSONObject();
+ return;
+ }
}
json = run.getJobActivityGraph().toJSON();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobProfileJSONWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobProfileJSONWork.java
deleted file mode 100644
index 513cb93..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobProfileJSONWork.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetJobProfileJSONWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final JobId jobId;
- private JSONObject json;
-
- public GetJobProfileJSONWork(ClusterControllerService ccs, JobId jobId) {
- this.ccs = ccs;
- this.jobId = jobId;
- }
-
- @Override
- protected void doRun() throws Exception {
- json = new JSONObject();
- JobRun jobRun = ccs.getRunMap().get(jobId);
- if (jobRun == null) {
- json = new JSONObject();
- return;
- }
- json = jobRun.getJobProfile().toJSON();
- }
-
- public JSONObject getJSON() {
- return json;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobRunJSONWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobRunJSONWork.java
index 9de7613..07c8f87 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobRunJSONWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobRunJSONWork.java
@@ -33,10 +33,13 @@
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
- json = new JSONObject();
- return;
+ run = ccs.getRunMapArchive().get(jobId);
+ if (run == null) {
+ json = new JSONObject();
+ return;
+ }
}
json = run.toJSON();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
index 7e16c9c..05b7cae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
@@ -33,10 +33,13 @@
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
- json = new JSONObject();
- return;
+ run = ccs.getRunMapArchive().get(jobId);
+ if (run == null) {
+ json = new JSONObject();
+ return;
+ }
}
json = run.getJobActivityGraph().getJobSpecification().toJSON();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
index 67eca4a..ff67928 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
@@ -31,7 +31,10 @@
@Override
protected void doRun() throws Exception {
- cVar = ccs.getRunMap().get(jobId);
+ cVar = ccs.getActiveRunMap().get(jobId);
+ if (cVar == null) {
+ cVar = ccs.getRunMapArchive().get(jobId);
+ }
}
public IJobStatusConditionVariable getConditionVariable() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
index cd29207..7a8943a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
@@ -32,7 +32,10 @@
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run == null) {
+ run = ccs.getRunMapArchive().get(jobId);
+ }
status = run == null ? null : run.getStatus();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 64b9058..5cc31f6 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -14,7 +14,10 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.Collection;
+
import org.json.JSONArray;
+import org.json.JSONException;
import org.json.JSONObject;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -32,7 +35,12 @@
@Override
protected void doRun() throws Exception {
summaries = new JSONArray();
- for (JobRun run : ccs.getRunMap().values()) {
+ populateJSON(ccs.getActiveRunMap().values());
+ populateJSON(ccs.getRunMapArchive().values());
+ }
+
+ private void populateJSON(Collection<JobRun> jobRuns) throws JSONException {
+ for (JobRun run : jobRuns) {
JSONObject jo = new JSONObject();
jo.put("type", "job-summary");
jo.put("job-id", run.getJobId().toString());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 1af4993..20ffc55 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -42,7 +42,7 @@
@Override
public void run() {
- final JobRun run = ccs.getRunMap().get(jobId);
+ final JobRun run = ccs.getActiveRunMap().get(jobId);
Set<String> targetNodes = run.getParticipatingNodeIds();
final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
int i = 0;
@@ -76,6 +76,8 @@
}
}
run.setStatus(status, exception);
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
}
});
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
index 7ede593..14947cd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
@@ -70,7 +70,7 @@
run.setStatus(JobStatus.INITIALIZED, null);
- ccs.getRunMap().put(jobId, run);
+ ccs.getActiveRunMap().put(jobId, run);
JobScheduler jrs = new JobScheduler(ccs, run);
run.setScheduler(jrs);
appCtx.notifyJobCreation(jobId, spec);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index d195cab..855e3e7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -31,7 +31,7 @@
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getRunMap().get(jobId);
+ JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
throw new Exception("Unable to find job with id = " + jobId);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index ec9a356..87b5878 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -38,7 +38,7 @@
@Override
public void run() {
final PartitionId pid = partitionDescriptor.getPartitionId();
- JobRun run = ccs.getRunMap().get(pid.getJobId());
+ JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
if (run == null) {
return;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index a7a36cb..19716a4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -36,7 +36,7 @@
@Override
public void run() {
PartitionId pid = partitionRequest.getPartitionId();
- JobRun run = ccs.getRunMap().get(pid.getJobId());
+ JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
if (run == null) {
return;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 889faa2..3575f81 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -62,13 +62,16 @@
}
}
}
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Number of affected jobs: " + affectedJobIds.size());
- }
- for (JobId jobId : affectedJobIds) {
- JobRun run = ccs.getRunMap().get(jobId);
- if (run != null) {
- run.getScheduler().notifyNodeFailures(deadNodes);
+ int size = affectedJobIds.size();
+ if (size > 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Number of affected jobs: " + size);
+ }
+ for (JobId jobId : affectedJobIds) {
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run != null) {
+ run.getScheduler().notifyNodeFailures(deadNodes);
+ }
}
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportProfilesWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportProfilesWork.java
index df12ee9..b8eba7e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportProfilesWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportProfilesWork.java
@@ -35,9 +35,13 @@
@Override
public void run() {
- Map<JobId, JobRun> runMap = ccs.getRunMap();
+ Map<JobId, JobRun> runMap = ccs.getActiveRunMap();
for (JobProfile profile : profiles) {
JobRun run = runMap.get(profile.getJobId());
+ if (run != null) {
+ JobProfile jp = run.getJobProfile();
+ jp.merge(profile);
+ }
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 0d12c65..c0af50b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -19,17 +19,33 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
public class TaskCompleteWork extends AbstractTaskLifecycleWork {
- public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
+ private final TaskProfile statistics;
+
+ public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
+ TaskProfile statistics) {
super(ccs, jobId, taId, nodeId);
+ this.statistics = statistics;
}
@Override
protected void performEvent(TaskAttempt ta) {
try {
ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
+ JobRun run = ac.getJobRun();
+ if (statistics != null) {
+ JobProfile jobProfile = run.getJobProfile();
+ JobletProfile jobletProfile = jobProfile.getJobletProfiles().get(nodeId);
+ if (jobletProfile != null) {
+ jobletProfile.getTaskProfiles().put(taId, statistics);
+ }
+ }
ac.getJobRun().getScheduler().notifyTaskComplete(ta);
} catch (HyracksException e) {
e.printStackTrace();
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.html
index 6942618..3f8719e 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.html
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/IndexPage.html
@@ -1,3 +1,46 @@
<wicket:extend xmlns:wicket>
-<h1>Index</h1>
+ <div>
+ Registered Node Count: <span wicket:id="node-count"></span>
+ </div>
+ <table>
+ <tr wicket:id="node-list">
+ <td>
+ <span wicket:id="node-id"></span>
+ </td>
+ <td>
+ <span wicket:id="heap-used"></span>
+ </td>
+ <td>
+ <span wicket:id="system-load-average"></span>
+ </td>
+ <td>
+ <a wicket:id="node-details">Details</a>
+ </td>
+ </tr>
+ </table>
+ <table>
+ <tr wicket:id="jobs-list">
+ <td>
+ <span wicket:id="job-id"></span>
+ </td>
+ <td>
+ <span wicket:id="application-name"></span>
+ </td>
+ <td>
+ <span wicket:id="status"></span>
+ </td>
+ <td>
+ <span wicket:id="create-time"></span>
+ </td>
+ <td>
+ <span wicket:id="start-time"></span>
+ </td>
+ <td>
+ <span wicket:id="end-time"></span>
+ </td>
+ <td>
+ <a wicket:id="job-details">Details</a>
+ </td>
+ </tr>
+ </table>
</wicket:extend>
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
new file mode 100644
index 0000000..b740fed
--- /dev/null
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
@@ -0,0 +1,12 @@
+<wicket:extend xmlns:wicket>
+ <script type="text/javascript">
+ $(document).ready(function() {
+ $.timer(function() {
+ window.location.reload();
+ }, 10000, true);
+ $.get('/rest/jobs/' + $.getURLParam('job-id') + '/job-specification, function(data) {
+ var result = data.result;
+ });
+ });
+ </script>
+</wicket:extend>
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.html
deleted file mode 100644
index 7c5a6e3..0000000
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobsSummaryPage.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<wicket:extend xmlns:wicket>
- <table>
- <tr wicket:id="jobs-list">
- <td>
- <span wicket:id="job-id"></span>
- </td>
- <td>
- <span wicket:id="application-name"></span>
- </td>
- <td>
- <span wicket:id="status"></span>
- </td>
- <td>
- <span wicket:id="create-time"></span>
- </td>
- <td>
- <span wicket:id="start-time"></span>
- </td>
- <td>
- <span wicket:id="end-time"></span>
- </td>
- </tr>
- </table>
-</wicket:extend>
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.html
deleted file mode 100644
index f868ef1..0000000
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodesSummaryPage.html
+++ /dev/null
@@ -1,21 +0,0 @@
-<wicket:extend xmlns:wicket>
- <div>
- Registered Node Count: <span wicket:id="node-count"></span>
- </div>
- <table>
- <tr wicket:id="node-list">
- <td>
- <span wicket:id="node-id"></span>
- </td>
- <td>
- <span wicket:id="heap-used"></span>
- </td>
- <td>
- <span wicket:id="system-load-average"></span>
- </td>
- <td>
- <a wicket:id="node-details">Details</a>
- </td>
- </tr>
- </table>
-</wicket:extend>
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index a9f8da3..916ef77 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -37,6 +37,9 @@
@Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
public int defaultMaxJobAttempts = 5;
+ @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
+ public int jobHistorySize = 10;
+
public void toCommandLine(List<String> cList) {
cList.add("-port");
cList.add(String.valueOf(port));
@@ -50,5 +53,7 @@
cList.add(String.valueOf(profileDumpPeriod));
cList.add("-default-max-job-attempts");
cList.add(String.valueOf(defaultMaxJobAttempts));
+ cList.add("-job-history-size");
+ cList.add(String.valueOf(jobHistorySize));
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
new file mode 100644
index 0000000..f6568d9
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionProfile implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final long openTime;
+
+ private final long closeTime;
+
+ private final byte[] frameTimes;
+
+ public PartitionProfile(PartitionId pid, long openTime, long closeTime, byte[] frameTimes) {
+ this.pid = pid;
+ this.openTime = openTime;
+ this.closeTime = closeTime;
+ this.frameTimes = frameTimes;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public long getOpenTime() {
+ return openTime;
+ }
+
+ public long getCloseTime() {
+ return closeTime;
+ }
+
+ public byte[] getFrameTimes() {
+ return frameTimes;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
index 3ab1090..3764a01 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -14,24 +14,36 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+
+import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
private final TaskAttemptId taskAttemptId;
- public TaskProfile(TaskAttemptId taskAttemptId) {
+ private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
this.taskAttemptId = taskAttemptId;
+ this.partitionSendProfile = partitionSendProfile;
}
public TaskAttemptId getTaskId() {
return taskAttemptId;
}
+ public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+ return partitionSendProfile;
+ }
+
@Override
public JSONObject toJSON() throws JSONException {
JSONObject json = new JSONObject();
@@ -40,6 +52,41 @@
json.put("activity-id", taskAttemptId.getTaskId().getActivityId().toString());
json.put("partition", taskAttemptId.getTaskId().getPartition());
json.put("attempt", taskAttemptId.getAttempt());
+ if (partitionSendProfile != null) {
+ JSONArray pspArray = new JSONArray();
+ for (PartitionProfile pp : partitionSendProfile.values()) {
+ JSONObject ppObj = new JSONObject();
+ PartitionId pid = pp.getPartitionId();
+ JSONObject pidObj = new JSONObject();
+ pidObj.put("job-id", pid.getJobId());
+ pidObj.put("connector-id", pid.getConnectorDescriptorId());
+ pidObj.put("sender-index", pid.getSenderIndex());
+ pidObj.put("receiver-index", pid.getReceiverIndex());
+ ppObj.put("partition-id", pidObj);
+ ppObj.put("open-time", pp.getOpenTime());
+ ppObj.put("close-time", pp.getCloseTime());
+ JSONArray ftArray = new JSONArray();
+ byte[] ftb = pp.getFrameTimes();
+ ByteArrayInputStream bais = new ByteArrayInputStream(ftb);
+ long value = 0;
+ int vLen = 0;
+ long time = pp.getOpenTime();
+ for (int i = 0; i < ftb.length; ++i) {
+ byte b = (byte) bais.read();
+ ++vLen;
+ value += (((long) (b & 0xef)) << ((vLen - 1) * 7));
+ if ((b & 0x80) == 0) {
+ time += value;
+ ftArray.put(time);
+ vLen = 0;
+ value = 0;
+ }
+ }
+ ppObj.put("frame-times", ftArray);
+ pspArray.put(ppObj);
+ }
+ json.put("partition-send-profile", pspArray);
+ }
populateCounters(json);
return json;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 9fe54a0..f5ae1f4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -62,7 +62,7 @@
public void schedule(AbstractWork event) {
if (LOGGER.isLoggable(event.logLevel())) {
- LOGGER.info("Scheduling: " + event);
+ LOGGER.log(event.logLevel(), "Scheduling: " + event);
}
queue.offer(event);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8404e99..381c44b 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -44,6 +45,7 @@
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -135,7 +137,7 @@
public synchronized void notifyTaskComplete(Task task) throws Exception {
taskMap.remove(task);
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
task.dumpProfile(taskProfile);
nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
nodeController.getId(), taskProfile);
@@ -157,7 +159,8 @@
counters.put(e.getKey(), e.getValue().get());
}
for (Task task : taskMap.values()) {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
+ new Hashtable<PartitionId, PartitionProfile>(task.getPartitionSendProfile()));
task.dumpProfile(taskProfile);
jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 105e37e..aaaaa21 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
@@ -37,9 +38,11 @@
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -62,6 +65,8 @@
private final IOperatorEnvironment opEnv;
+ private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
private IPartitionCollector[] collectors;
private IOperatorNodePushable operator;
@@ -78,6 +83,7 @@
counterMap = new HashMap<String, Counter>();
opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
.getPartition());
+ partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -144,6 +150,10 @@
return this;
}
+ public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+ return partitionSendProfile;
+ }
+
public synchronized void dumpProfile(TaskProfile tProfile) {
Map<String, Long> dumpMap = tProfile.getCounters();
for (Counter c : counterMap.values()) {
@@ -151,6 +161,10 @@
}
}
+ public void setPartitionSendProfile(PartitionProfile profile) {
+ partitionSendProfile.put(profile.getPartitionId(), profile);
+ }
+
public void start() throws HyracksException {
aborted = false;
joblet.getExecutor().execute(this);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
new file mode 100644
index 0000000..1011cf3
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+ private final IHyracksTaskContext ctx;
+
+ private final IConnectorDescriptor cd;
+
+ private final int senderIndex;
+
+ private final IPartitionWriterFactory delegate;
+
+ public ProfilingPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorDescriptor cd, int senderIndex,
+ IPartitionWriterFactory delegate) {
+ this.ctx = ctx;
+ this.cd = cd;
+ this.senderIndex = senderIndex;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public IFrameWriter createFrameWriter(final int receiverIndex) throws HyracksDataException {
+ final IFrameWriter writer = new ConnectorSenderProfilingFrameWriter(ctx,
+ delegate.createFrameWriter(receiverIndex), cd.getConnectorId(), senderIndex, receiverIndex);
+ return new IFrameWriter() {
+ private long openTime;
+
+ private long closeTime;
+
+ private long prevTime;
+
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ @Override
+ public void open() throws HyracksDataException {
+ baos.reset();
+ openTime = System.currentTimeMillis();
+ prevTime = openTime;
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ long time = System.currentTimeMillis();
+ long diff = time - prevTime;
+ prevTime = time;
+ do {
+ byte b = (byte) (diff & 0xef);
+ diff >>= 7;
+ if (diff != 0) {
+ b |= 0x80;
+ }
+ baos.write(b);
+ } while (diff != 0);
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ closeTime = System.currentTimeMillis();
+ ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
+ .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, baos
+ .toByteArray()));
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 23962ce..153eaff 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.nc.work;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
@@ -24,6 +25,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,6 +38,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
@@ -48,6 +51,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
public class StartTasksWork extends SynchronizableWork {
private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
@@ -80,17 +84,17 @@
try {
Map<String, NCApplicationContext> applications = ncs.getApplications();
NCApplicationContext appCtx = applications.get(appName);
- final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
+ final JobActivityGraph jag = (JobActivityGraph) appCtx.deserialize(jagBytes);
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
- return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+ return jag.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
}
@Override
public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
- return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ return jag.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
}
};
@@ -99,7 +103,7 @@
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
- IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
+ IActivity han = jag.getActivityNodeMap().get(tid.getActivityId());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing " + taId + " -> " + han);
}
@@ -109,7 +113,7 @@
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
- List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
+ List<IConnectorDescriptor> inputs = jag.getActivityInputConnectorDescriptors(tid.getActivityId());
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -117,21 +121,21 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
- RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+ RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
recordDesc, cPolicy);
collectors.add(collector);
}
}
- List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
+ List<IConnectorDescriptor> outputs = jag.getActivityOutputConnectorDescriptors(tid.getActivityId());
if (outputs != null) {
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
- RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+ RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
- IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
- partition, taId);
+ IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
+ partition, taId, jag.getJobFlags());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
@@ -176,11 +180,13 @@
}
}
- private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
- final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
+ private IPartitionWriterFactory createPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorPolicy cPolicy,
+ final JobId jobId, final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId,
+ EnumSet<JobFlag> flags) {
+ IPartitionWriterFactory factory;
if (cPolicy.materializeOnSendSide()) {
if (cPolicy.consumerWaitsForProducerToFinish()) {
- return new IPartitionWriterFactory() {
+ factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
@@ -189,7 +195,7 @@
}
};
} else {
- return new IPartitionWriterFactory() {
+ factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
@@ -199,7 +205,7 @@
};
}
} else {
- return new IPartitionWriterFactory() {
+ factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
@@ -207,5 +213,9 @@
}
};
}
+ if (flags.contains(JobFlag.PROFILE_RUNTIME)) {
+ factory = new ProfilingPartitionWriterFactory(ctx, conn, senderIndex, factory);
+ }
+ return factory;
}
}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-examples/hyracks-integration-tests/pom.xml
index 81507fc..7012cde 100644
--- a/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -21,14 +21,6 @@
<target>1.6</target>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkMode>pertest</forkMode>
- <argLine>-enableassertions</argLine>
- </configuration>
- </plugin>
</plugins>
</build>
<dependencies>
diff --git a/pom.xml b/pom.xml
index 243493f..0320e7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,14 @@
<artifactId>versions-maven-plugin</artifactId>
<version>1.2</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Djava.util.logging.config.file=${basedir}/config/logging.properties</argLine>
+ </configuration>
+ </plugin>
</plugins>
</build>