Added the ability to collect profile. Added REST Api to provide access to job state
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@173 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 63cb572..753079b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -31,7 +31,6 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
abstract class AbstractHyracksConnection implements IHyracksClientConnection {
private final String ccHost;
@@ -95,7 +94,7 @@
}
@Override
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- return hci.waitForCompletion(jobId);
+ public void waitForCompletion(UUID jobId) throws Exception {
+ hci.waitForCompletion(jobId);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index b3c1148..a164fee 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
public interface IHyracksClientConnection {
public void createApplication(String appName, File harFile) throws Exception;
@@ -36,5 +35,5 @@
public void start(UUID jobId) throws Exception;
- public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+ public void waitForCompletion(UUID jobId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 0c0089f..c4bf0e6 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -20,7 +20,6 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
public interface IHyracksClientInterface extends Remote {
public ClusterControllerInfo getClusterControllerInfo() throws Exception;
@@ -37,5 +36,5 @@
public void start(UUID jobId) throws Exception;
- public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+ public void waitForCompletion(UUID jobId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
index 2b78fbb..f577db2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/IClusterController.java
@@ -18,19 +18,17 @@
import java.util.Map;
import java.util.UUID;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
public interface IClusterController extends Remote {
public NodeParameters registerNode(INodeController nodeController) throws Exception;
public void unregisterNode(INodeController nodeController) throws Exception;
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception;
+ Map<String, Long> statistics) throws Exception;
public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
public void nodeHeartbeat(String id) throws Exception;
- public void reportProfile(String id, Map<String, Long> counterDump) throws Exception;
+ public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
deleted file mode 100644
index 8784b46..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-public class JobStatistics implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Date startTime;
-
- private Date endTime;
-
- private List<StageStatistics> stages;
-
- public JobStatistics() {
- stages = new ArrayList<StageStatistics>();
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void addStageStatistics(StageStatistics stageStatistics) {
- stages.add(stageStatistics);
- }
-
- public List<StageStatistics> getStages() {
- return stages;
- }
-
- @Override
- public String toString() {
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- StringBuilder buffer = new StringBuilder();
-
- buffer.append("{\n");
- indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
- indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
- indent(buffer, 1).append("stages: [\n");
- boolean first = true;
- for (StageStatistics ss : stages) {
- if (!first) {
- buffer.append(",\n");
- }
- first = false;
- ss.toString(buffer, 2);
- }
- buffer.append("\n");
- indent(buffer, 1).append("]\n");
- buffer.append("}");
-
- return buffer.toString();
- }
-
- static StringBuilder indent(StringBuilder buffer, int level) {
- for (int i = 0; i < level; ++i) {
- buffer.append(" ");
- }
- return buffer;
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java
deleted file mode 100644
index 6cc437b..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageStatistics.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-public class StageStatistics implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private UUID stageId;
-
- private Map<String, StageletStatistics> stageletMap;
-
- public StageStatistics() {
- stageletMap = new HashMap<String, StageletStatistics>();
- }
-
- public void setStageId(UUID stageId) {
- this.stageId = stageId;
- }
-
- public UUID getStageId() {
- return stageId;
- }
-
- public void addStageletStatistics(StageletStatistics ss) {
- stageletMap.put(ss.getNodeId(), ss);
- }
-
- public Map<String, StageletStatistics> getStageletStatistics() {
- return stageletMap;
- }
-
- public void toString(StringBuilder buffer, int level) {
- JobStatistics.indent(buffer, level).append("{\n");
- JobStatistics.indent(buffer, level + 1).append("stageId: '").append(stageId).append("',\n");
- JobStatistics.indent(buffer, level + 1).append("stagelets: {\n");
- boolean first = true;
- for (Map.Entry<String, StageletStatistics> e : stageletMap.entrySet()) {
- if (!first) {
- buffer.append(",\n");
- }
- first = false;
- JobStatistics.indent(buffer, level + 2).append(e.getKey()).append(": ");
- e.getValue().toString(buffer, level + 3);
- }
- buffer.append("\n");
- JobStatistics.indent(buffer, level + 1).append("}\n");
- JobStatistics.indent(buffer, level).append("}");
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java
deleted file mode 100644
index 66fb871..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/statistics/StageletStatistics.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.job.statistics;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StageletStatistics implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String nodeId;
-
- private Date startTime;
-
- private Date endTime;
-
- private Map<String, String> statisticsMap;
-
- public StageletStatistics() {
- statisticsMap = Collections.synchronizedMap(new TreeMap<String, String>());
- }
-
- public void setNodeId(String nodeId) {
- this.nodeId = nodeId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public Map<String, String> getStatisticsMap() {
- return statisticsMap;
- }
-
- public void toString(StringBuilder buffer, int level) {
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- buffer.append("{\n");
- JobStatistics.indent(buffer, level + 1).append("nodeId: '").append(nodeId).append("',\n");
- JobStatistics.indent(buffer, level + 1).append("startTime: '").append(df.format(startTime)).append("',\n");
- JobStatistics.indent(buffer, level + 1).append("endTime: '").append(df.format(endTime)).append("',\n");
- JobStatistics.indent(buffer, level + 1).append("statistics: {\n");
- boolean first = true;
- for (Map.Entry<String, String> e : statisticsMap.entrySet()) {
- if (!first) {
- buffer.append(",\n");
- }
- first = false;
- JobStatistics.indent(buffer, level + 2).append(e.getKey()).append(": '").append(e.getValue()).append("'");
- }
- buffer.append("\n");
- JobStatistics.indent(buffer, level + 1).append("}\n");
- JobStatistics.indent(buffer, level).append("}");
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index acdbd2e..1956fb7 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -32,13 +32,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.wicket</groupId>
- <artifactId>wicket</artifactId>
- <version>1.4.7</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>jol</groupId>
<artifactId>jol</artifactId>
<version>1.0.0</version>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 723d767..e411517 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,6 +24,7 @@
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,6 +56,8 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
+import org.json.JSONArray;
+import org.json.JSONObject;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
@@ -71,9 +74,10 @@
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.IJSONOutputFunction;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.JSONOutputRequestHandler;
+import edu.uci.ics.hyracks.control.cc.web.handlers.util.RoutingHandler;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -117,6 +121,7 @@
webServer = new WebServer();
webServer.addHandler(getAdminConsoleHandler());
webServer.addHandler(getApplicationInstallationHandler());
+ webServer.addHandler(getRestAPIHandler());
this.timer = new Timer(true);
}
@@ -197,14 +202,13 @@
} catch (Exception e) {
}
}
-
});
}
}
@Override
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception {
+ Map<String, Long> statistics) throws Exception {
jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
}
@@ -224,14 +228,48 @@
}
@Override
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- return jobManager.waitForCompletion(jobId);
+ public void waitForCompletion(UUID jobId) throws Exception {
+ jobManager.waitForCompletion(jobId);
}
@Override
- public void reportProfile(String id, Map<String, Long> counterDump) throws Exception {
- if (LOGGER.isLoggable(Level.INFO))
+ public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Profile: " + id + ": " + counterDump);
+ }
+ jobManager.reportProfile(id, counterDump);
+ }
+
+ private Handler getRestAPIHandler() {
+ ContextHandler handler = new ContextHandler("/state");
+ RoutingHandler rh = new RoutingHandler();
+ rh.addHandler("jobs", new JSONOutputRequestHandler(new IJSONOutputFunction() {
+ @Override
+ public JSONObject invoke(String[] arguments) throws Exception {
+ JSONObject result = new JSONObject();
+ switch (arguments.length) {
+ case 1:
+ if (!"".equals(arguments[0])) {
+ break;
+ }
+ case 0:
+ result.put("result", jobManager.getQueryInterface().getAllJobSummaries());
+ break;
+
+ case 2:
+ UUID jobId = UUID.fromString(arguments[0]);
+
+ if ("spec".equalsIgnoreCase(arguments[1])) {
+ result.put("result", jobManager.getQueryInterface().getJobSpecification(jobId));
+ } else if ("profile".equalsIgnoreCase(arguments[1])) {
+ result.put("result", jobManager.getQueryInterface().getJobProfile(jobId));
+ }
+ }
+ return result;
+ }
+ }));
+ handler.setHandler(rh);
+ return handler;
}
private Handler getAdminConsoleHandler() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index bb90d0b..1f14b3b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -15,13 +15,12 @@
package edu.uci.ics.hyracks.control.cc;
import java.util.EnumSet;
+import java.util.Map;
import java.util.UUID;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public interface IJobManager {
public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
@@ -29,15 +28,19 @@
public void start(UUID jobId) throws Exception;
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception;
+ Map<String, Long> statistics) throws Exception;
public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
public JobStatus getJobStatus(UUID jobId);
- public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+ public void waitForCompletion(UUID jobId) throws Exception;
public void notifyNodeFailure(String nodeId) throws Exception;
public void registerNode(String nodeId) throws Exception;
+
+ public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception;
+
+ public IJobManagerQueryInterface getQueryInterface();
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java
new file mode 100644
index 0000000..221fc10
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManagerQueryInterface.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc;
+
+import java.util.UUID;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+public interface IJobManagerQueryInterface {
+ public JSONArray getAllJobSummaries() throws Exception;
+
+ public JSONObject getJobSpecification(UUID jobId) throws Exception;
+
+ public JSONObject getJobPlan(UUID jobId) throws Exception;
+
+ public JSONObject getJobProfile(UUID jobId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index 22cce67..9ed4f00 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -36,6 +36,10 @@
import jol.types.table.Function;
import jol.types.table.Key;
import jol.types.table.TableName;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
@@ -56,9 +60,6 @@
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public class JOLJobManagerImpl implements IJobManager {
private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -113,8 +114,12 @@
private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+ private final ProfileUpdateTable puTable;
+
private final List<String> rankedAvailableNodes;
+ private final IJobManagerQueryInterface qi;
+
public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
this.jolRuntime = jolRuntime;
jobQueue = new LinkedBlockingQueue<Runnable>();
@@ -141,6 +146,7 @@
this.abortMessageTable = new AbortMessageTable(jolRuntime);
this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+ this.puTable = new ProfileUpdateTable();
this.rankedAvailableNodes = new ArrayList<String>();
jolRuntime.catalog().register(jobTable);
@@ -163,6 +169,7 @@
jolRuntime.catalog().register(abortMessageTable);
jolRuntime.catalog().register(abortNotifyTable);
jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+ jolRuntime.catalog().register(puTable);
jobTable.register(new JobTable.Callback() {
@Override
@@ -366,6 +373,8 @@
jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
jolRuntime.evaluate();
+
+ qi = new QueryInterfaceImpl();
}
@Override
@@ -476,6 +485,15 @@
}
@Override
+ public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception {
+ BasicTupleSet puTuples = new BasicTupleSet();
+ for (Map.Entry<UUID, Map<String, Long>> e : counterDump.entrySet()) {
+ puTuples.add(ProfileUpdateTable.createTuple(e.getKey(), id, e.getValue()));
+ }
+ jolRuntime.schedule(JOL_SCOPE, ProfileUpdateTable.TABLE_NAME, puTuples, null);
+ }
+
+ @Override
public JobStatus getJobStatus(UUID jobId) {
synchronized (jobTable) {
try {
@@ -530,7 +548,7 @@
@Override
public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception {
+ Map<String, Long> statistics) throws Exception {
BasicTupleSet scTuples = new BasicTupleSet();
scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
@@ -582,14 +600,31 @@
}
@Override
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ public void waitForCompletion(UUID jobId) throws Exception {
synchronized (jobTable) {
Tuple jobTuple = null;
while ((jobTuple = jobTable.lookupJob(jobId)) != null
&& jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX) != JobStatus.TERMINATED) {
jobTable.wait();
}
- return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+ }
+ }
+
+ @Override
+ public IJobManagerQueryInterface getQueryInterface() {
+ return qi;
+ }
+
+ public void visitJobs(ITupleProcessor processor) throws Exception {
+ for (Tuple t : jobTable.tuples()) {
+ processor.process(t);
+ }
+ }
+
+ public void visitJob(UUID jobId, ITupleProcessor processor) throws Exception {
+ Tuple job = jobTable.lookupJob(jobId);
+ if (job != null) {
+ processor.process(job);
}
}
@@ -603,7 +638,7 @@
@SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, JobStatus.class,
- JobSpecification.class, JobPlan.class, Set.class };
+ JobSpecification.class, JobPlan.class, Map.class };
public static final int JOBID_FIELD_INDEX = 0;
public static final int APPNAME_FIELD_INDEX = 1;
@@ -618,24 +653,7 @@
@SuppressWarnings("unchecked")
static Tuple createInitialJobTuple(UUID jobId, String appName, JobSpecification jobSpec, JobPlan plan) {
- return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
- }
-
- @SuppressWarnings("unchecked")
- JobStatistics buildJobStatistics(Tuple jobTuple) {
- Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple
- .value(JobTable.STATISTICS_FIELD_INDEX);
- JobStatistics stats = new JobStatistics();
- if (statsSet != null) {
- for (Set<StageletStatistics> stageStatsSet : statsSet) {
- StageStatistics stageStats = new StageStatistics();
- for (StageletStatistics stageletStats : stageStatsSet) {
- stageStats.addStageletStatistics(stageletStats);
- }
- stats.addStageStatistics(stageStats);
- }
- }
- return stats;
+ return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashMap());
}
Tuple lookupJob(UUID jobId) throws BadKeyException {
@@ -876,7 +894,7 @@
}
/*
- * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
*/
private static class StageletCompleteTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
@@ -885,14 +903,14 @@
@SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
- StageletStatistics.class };
+ Map.class };
public StageletCompleteTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
- StageletStatistics statistics) {
+ Map<String, Long> statistics) {
return new Tuple(jobId, stageId, nodeId, attempt, statistics);
}
}
@@ -1044,6 +1062,24 @@
}
}
+ /*
+ * declare(profileupdate, keys(0, 1), {JobId, NodeId, Map})
+ */
+ private static class ProfileUpdateTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "profileupdate");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, Map.class };
+
+ public ProfileUpdateTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, String nodeId, Map<String, Long> statistics) {
+ return new Tuple(jobId, nodeId, statistics);
+ }
+ }
+
private class JobQueueThread extends Thread {
public JobQueueThread() {
setDaemon(true);
@@ -1065,4 +1101,78 @@
}
}
}
+
+ public interface ITupleProcessor {
+ public void process(Tuple t) throws Exception;
+ }
+
+ private class QueryInterfaceImpl implements IJobManagerQueryInterface {
+ @Override
+ public JSONArray getAllJobSummaries() throws Exception {
+ final JSONArray jobs = new JSONArray();
+ JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+ @Override
+ public void process(Tuple t) throws Exception {
+ JSONObject jo = new JSONObject();
+ jo.put("type", "job-summary");
+ jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString());
+ jo.put("status", t.value(JobTable.JOBSTATUS_FIELD_INDEX).toString());
+ jobs.put(jo);
+ }
+ };
+ visitJobs(tp);
+ return jobs;
+ }
+
+ @Override
+ public JSONObject getJobSpecification(UUID jobId) throws Exception {
+ final JSONArray jobs = new JSONArray();
+ JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+ @Override
+ public void process(Tuple t) throws Exception {
+ JobSpecification js = (JobSpecification) t.value(JobTable.JOBSPEC_FIELD_INDEX);
+ jobs.put(js.toJSON());
+ }
+ };
+ visitJob(jobId, tp);
+ return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+ }
+
+ @Override
+ public JSONObject getJobPlan(UUID jobId) throws Exception {
+ final JSONArray jobs = new JSONArray();
+ JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+ @Override
+ public void process(Tuple t) throws Exception {
+ }
+ };
+ visitJob(jobId, tp);
+ return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+ }
+
+ @Override
+ public JSONObject getJobProfile(UUID jobId) throws Exception {
+ final JSONArray jobs = new JSONArray();
+ JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() {
+ @Override
+ public void process(Tuple t) throws Exception {
+ JSONObject jo = new JSONObject();
+ jo.put("type", "profile");
+ jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString());
+ Map<String, Long> profile = (Map<String, Long>) t.value(JobTable.STATISTICS_FIELD_INDEX);
+ if (profile != null) {
+ for (Map.Entry<String, Long> e : profile.entrySet()) {
+ JSONObject jpe = new JSONObject();
+ jpe.put("name", e.getKey());
+ jpe.put("value", e.getValue());
+ jo.accumulate("counters", jpe);
+ }
+ }
+ jobs.put(jo);
+ }
+ };
+ visitJob(jobId, tp);
+ return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0));
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
index 438e235..102a011 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/StageProgress.java
@@ -18,20 +18,14 @@
import java.util.Set;
import java.util.UUID;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-
public class StageProgress {
private final UUID stageId;
private final Set<String> pendingNodes;
- private final StageStatistics stageStatistics;
-
public StageProgress(UUID stageId) {
this.stageId = stageId;
pendingNodes = new HashSet<String>();
- stageStatistics = new StageStatistics();
- stageStatistics.setStageId(stageId);
}
public UUID getStageId() {
@@ -49,8 +43,4 @@
public boolean stageComplete() {
return pendingNodes.isEmpty();
}
-
- public StageStatistics getStageStatistics() {
- return stageStatistics;
- }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java
new file mode 100644
index 0000000..75edf2b
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/IJSONOutputFunction.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import org.json.JSONObject;
+
+public interface IJSONOutputFunction {
+ public JSONObject invoke(String[] arguments) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java
new file mode 100644
index 0000000..c6b8d9c
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/JSONOutputRequestHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.json.JSONObject;
+
+public class JSONOutputRequestHandler extends AbstractHandler {
+ private final IJSONOutputFunction fn;
+
+ public JSONOutputRequestHandler(IJSONOutputFunction fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ while (target.startsWith("/")) {
+ target = target.substring(1);
+ }
+ while (target.endsWith("/")) {
+ target = target.substring(0, target.length() - 1);
+ }
+ String[] parts = target.split("/");
+ try {
+ JSONObject result = fn.invoke(parts);
+ response.setContentType("application/json");
+ result.write(response.getWriter());
+ baseRequest.setHandled(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java
new file mode 100644
index 0000000..6afaf2a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/handlers/util/RoutingHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web.handlers.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+public class RoutingHandler extends AbstractHandler {
+ private final Map<String, Handler> handlers;
+
+ public RoutingHandler() {
+ handlers = new HashMap<String, Handler>();
+ }
+
+ public synchronized void addHandler(String route, Handler handler) {
+ handlers.put(route, handler);
+ }
+
+ public synchronized void removeHandler(String route) {
+ handlers.remove(route);
+ }
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ while (target.startsWith("/")) {
+ target = target.substring(1);
+ }
+ int idx = target.indexOf('/');
+ String path0 = target;
+ String rest = "";
+ if (idx >= 0) {
+ path0 = target.substring(0, idx);
+ rest = target.substring(idx);
+ }
+ Handler h;
+ synchronized (this) {
+ h = handlers.get(path0);
+ }
+ if (h != null) {
+ h.handle(rest, baseRequest, request, response);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
index 46188da..a717128 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -2,6 +2,8 @@
import java.util.UUID;
import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import jol.types.basic.Tuple;
import jol.types.basic.TupleSet;
@@ -95,16 +97,23 @@
NextAttempt := Attempt + 1;
};
+job(JobId, AppName, Status, JobSpec, JobPlan, Stats) :-
+ job(JobId, AppName, Status, JobSpec, JobPlan, Stats),
+ profileupdate(JobId, _, JobletStats)
+ {
+ Stats.putAll(JobletStats);
+ };
+
define(stagestart, keys(), {UUID, Integer, Integer});
-define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Map});
watch(jobstart, i);
stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
jobattempt#insert(JobId, Attempt);
-update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
- job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats) :-
+ job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, Stats),
jobstart(JobId, _);
stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
@@ -287,22 +296,33 @@
stageabort(JobId, StageId, Attempt, NodeIdSet2),
NodeIdSet1.size() == NodeIdSet2.size();
-define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+define(stageletcompletecount, keys(0, 1, 2), {UUID, UUID, Integer, Integer});
-stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+stageletcompletecount(JobId, StageId, Attempt, count<NodeId>) :-
+ stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Map});
+
+stageletcomplete_agg(JobId, StageId, Attempt, generic<(new HashMap()).putAll(Statistics)>) :-
stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
-stagefinish(JobId, StageNumber, Attempt, SSet) :-
+stagefinish(JobId, StageNumber, Attempt, SMap) :-
startmessage_agg(JobId, StageId, Attempt, _, _, TSet),
- stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+ stageletcompletecount(JobId, StageId, Attempt, Count),
+ stageletcomplete_agg(JobId, StageId, Attempt, SMap),
jobstage(JobId, StageNumber, StageId),
- TSet.size() == SSet.size();
+ TSet.size() == Count;
-update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
- job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, NewStats) :-
+ job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats),
stagestart#insert(JobId, StageNumber, Attempt),
- stagefinish(JobId, _, Attempt, SSet),
- notin jobstage(JobId, StageNumber);
+ stagefinish(JobId, _, Attempt, SMap),
+ notin jobstage(JobId, StageNumber)
+ {
+ NewStats := new HashMap();
+ NewStats.putAll(Stats);
+ NewStats.putAll(SMap);
+ };
define(jobcleanup_agg, {UUID, Set});
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index f83a3c1..b48be8f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public class Joblet {
private static final long serialVersionUID = 1L;
@@ -89,7 +88,7 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats)
+ public synchronized void notifyStageletComplete(UUID stageId, int attempt, Map<String, Long> stats)
throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d9c5cda..3690ed7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -31,8 +31,6 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -75,7 +73,6 @@
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -464,7 +461,7 @@
}
}
- public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+ public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, Map<String, Long> stats) throws Exception {
try {
ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
} catch (Exception e) {
@@ -519,21 +516,25 @@
@Override
public void run() {
try {
- Map<String, Long> counterDump = new TreeMap<String, Long>();
+ Map<UUID, Map<String, Long>> counterDump = new HashMap<UUID, Map<String, Long>>();
Set<UUID> jobIds;
synchronized (NodeControllerService.this) {
jobIds = new HashSet<UUID>(jobletMap.keySet());
}
- for(UUID jobId : jobIds) {
+ for (UUID jobId : jobIds) {
Joblet ji;
synchronized (NodeControllerService.this) {
ji = jobletMap.get(jobId);
}
if (ji != null) {
- ji.dumpProfile(counterDump);
+ Map<String, Long> jobletCounterDump = new HashMap<String, Long>();
+ ji.dumpProfile(jobletCounterDump);
+ counterDump.put(jobId, jobletCounterDump);
}
}
- cc.reportProfile(id, counterDump);
+ if (!counterDump.isEmpty()) {
+ cc.reportProfile(id, counterDump);
+ }
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index afbb95d..0f53f26 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -15,12 +15,12 @@
package edu.uci.ics.hyracks.control.nc;
import java.rmi.RemoteException;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.control.nc.job.profiling.CounterContext;
import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
@@ -55,8 +54,6 @@
private final Set<OperatorInstanceId> pendingOperators;
- private final StageletStatistics stats;
-
public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
this.joblet = joblet;
this.stageId = stageId;
@@ -65,8 +62,6 @@
started = false;
honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
stageletCounterContext = new CounterContext(joblet.getJobId() + "." + stageId + "." + nodeId);
- stats = new StageletStatistics();
- stats.setNodeId(nodeId);
}
public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
@@ -94,7 +89,6 @@
throw new Exception("Joblet already started");
}
started = true;
- stats.setStartTime(new Date());
notifyAll();
}
@@ -145,8 +139,9 @@
protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
pendingOperators.remove(opIId);
if (pendingOperators.isEmpty()) {
- stats.setEndTime(new Date());
try {
+ Map<String, Long> stats = new TreeMap<String, Long>();
+ dumpProfile(stats);
joblet.notifyStageletComplete(stageId, attempt, stats);
} catch (Exception e) {
e.printStackTrace();
@@ -169,10 +164,6 @@
}
}
- public StageletStatistics getStatistics() {
- return stats;
- }
-
public void dumpProfile(Map<String, Long> counterDump) {
stageletCounterContext.dump(counterDump);
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index c0b6d34..858ff6c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -42,6 +42,7 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.port = 39001;
+ ccConfig.profileDumpPeriod = 1000;
ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -72,12 +73,12 @@
nc1.stop();
cc.stop();
}
-
+
protected void runTest(JobSpecification spec) throws Exception {
UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
System.err.println(spec.toJSON());
hcc.start(jobId);
System.err.print(jobId);
- System.err.println(cc.waitForCompletion(jobId));
+ cc.waitForCompletion(jobId);
}
}
\ No newline at end of file