added ability to trigger an application state dump through the rest api
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
index c80f6d1..c94fa9a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -66,4 +66,11 @@
* @return Memory Manager
*/
public IMemoryManager getMemoryManager();
+
+ /**
+ * Set the handler for state dumps.
+ *
+ * @param handler
+ */
+ public void setStateDumpHandler(IStateDumpHandler handler);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java
new file mode 100644
index 0000000..29d5af4
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface IStateDumpHandler {
+ public void dumpState(OutputStream os) throws IOException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
index 2cb757f..2538510 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -21,5 +21,7 @@
public void start();
+ public void dumpState(OutputStream os) throws IOException;
+
public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
index 6247c17..b3b2566 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.lifecycle;
import java.io.IOException;
+import java.io.OutputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
@@ -24,7 +25,11 @@
public void startAll();
+ public void dumpState(OutputStream os) throws IOException;
+
public void stopAll(boolean dumpState) throws IOException;
public void configure(Map<String, String> configuration);
+
+ public String getDumpPath();
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index b2a0aec..31cbc01 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -140,4 +141,17 @@
configured = true;
}
+ @Override
+ public String getDumpPath() {
+ return dumpPath;
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ for (int index = components.size() - 1; index >= 0; index--) {
+ ILifeCycleComponent component = components.get(index);
+ component.dumpState(os);
+ }
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5ec9c19..8184fe6 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -54,6 +54,7 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobInfoWork;
@@ -65,6 +66,7 @@
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyStateDumpResponse;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
@@ -82,6 +84,7 @@
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
@@ -138,6 +141,8 @@
private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
+ private final Map<String, StateDumpRun> stateDumpRunMap;
+
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -194,6 +199,7 @@
jobCounter = 0;
deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
+ stateDumpRunMap = new HashMap<>();
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -549,11 +555,30 @@
}));
return;
}
+
+ case STATE_DUMP_RESPONSE: {
+ CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
+ workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
+ dsrf.getStateDumpId(), dsrf.getState()));
+ return;
+ }
}
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}
}
+ public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
+ stateDumpRunMap.put(id, sdr);
+ }
+
+ public synchronized StateDumpRun getStateDumpRun(String id) {
+ return stateDumpRunMap.get(id);
+ }
+
+ public synchronized void removeStateDumpRun(String id) {
+ stateDumpRunMap.remove(id);
+ }
+
/**
* Add a deployment run
*
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java
new file mode 100644
index 0000000..cd79eb3
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.cc.web;
+
+import java.util.Map;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
+
+public class StateDumpRESTAPIFunction implements IJSONOutputFunction {
+ private final ClusterControllerService ccs;
+
+ public StateDumpRESTAPIFunction(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public JSONObject invoke(String[] arguments) throws Exception {
+ GatherStateDumpsWork gsdw = new GatherStateDumpsWork(ccs);
+ ccs.getWorkQueue().scheduleAndSync(gsdw);
+ StateDumpRun sdr = gsdw.getStateDumpRun();
+ sdr.waitForCompletion();
+
+ JSONObject result = new JSONObject();
+ for (Map.Entry<String, String> e : sdr.getStateDump().entrySet()) {
+ result.put(e.getKey(), e.getValue());
+ }
+ return result;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index dc59633..c100111 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -65,6 +65,7 @@
RoutingHandler rh = new RoutingHandler();
rh.addHandler("jobs", new JSONOutputRequestHandler(new JobsRESTAPIFunction(ccs)));
rh.addHandler("nodes", new JSONOutputRequestHandler(new NodesRESTAPIFunction(ccs)));
+ rh.addHandler("statedump", new JSONOutputRequestHandler(new StateDumpRESTAPIFunction(ccs)));
handler.setHandler(rh);
addHandler(handler);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java
new file mode 100644
index 0000000..06e3758
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GatherStateDumpsWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final StateDumpRun sdr;
+
+ public GatherStateDumpsWork(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ this.sdr = new StateDumpRun(ccs);
+ }
+
+ @Override
+ public void doRun() throws Exception {
+ ccs.addStateDumpRun(sdr.stateDumpId, sdr);
+ sdr.setNCs(new HashSet<>(ccs.getNodeMap().keySet()));
+ for (NodeControllerState ncs : ccs.getNodeMap().values()) {
+ ncs.getNodeController().dumpState(sdr.stateDumpId);
+ }
+ }
+
+ public StateDumpRun getStateDumpRun() {
+ return sdr;
+ }
+
+ public static class StateDumpRun {
+
+ private final ClusterControllerService ccs;
+
+ private final String stateDumpId;
+
+ private final Map<String, String> ncStates;
+
+ private Set<String> ncIds;
+
+ private boolean complete;
+
+ public StateDumpRun(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ stateDumpId = UUID.randomUUID().toString();
+ ncStates = new HashMap<>();
+ complete = false;
+ }
+
+ public void setNCs(Set<String> ncIds) {
+ this.ncIds = ncIds;
+ }
+
+ public Map<String, String> getStateDump() {
+ return ncStates;
+ }
+
+ public synchronized void notifyStateDumpReceived(String nodeId, String state) {
+ ncIds.remove(nodeId);
+ ncStates.put(nodeId, state);
+ if (ncIds.size() == 0) {
+ complete = true;
+ ccs.removeStateDumpRun(stateDumpId);
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForCompletion() throws InterruptedException {
+ while (!complete) {
+ wait();
+ }
+ }
+
+ public String getStateDumpId() {
+ return stateDumpId;
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java
new file mode 100644
index 0000000..617bf6a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class NotifyStateDumpResponse extends AbstractWork {
+
+ private final ClusterControllerService ccs;
+
+ private final String stateDumpId;
+
+ private final String nodeId;
+
+ private final String state;
+
+ public NotifyStateDumpResponse(ClusterControllerService ccs, String nodeId, String stateDumpId, String state) {
+ this.ccs = ccs;
+ this.stateDumpId = stateDumpId;
+ this.nodeId = nodeId;
+ this.state = state;
+ }
+
+ @Override
+ public void run() {
+ ccs.getStateDumpRun(stateDumpId).notifyStateDumpReceived(nodeId, state);
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index ec96f2b..926c83f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -44,6 +44,8 @@
public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
+ public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 464e2b2..ec95d58 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -44,4 +44,6 @@
public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
public void undeployBinary(DeploymentId deploymentId) throws Exception;
+
+ public void dumpState(String stateDumpId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e5d8e5f..9077cd8 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -88,6 +88,9 @@
NOTIFY_DEPLOY_BINARY,
UNDEPLOY_BINARY,
+ STATE_DUMP_REQUEST,
+ STATE_DUMP_RESPONSE,
+
OTHER
}
@@ -845,6 +848,60 @@
}
}
+ public static class StateDumpRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String stateDumpId;
+
+ public StateDumpRequestFunction(String stateDumpId) {
+ this.stateDumpId = stateDumpId;
+ }
+
+ public String getStateDumpId() {
+ return stateDumpId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.STATE_DUMP_REQUEST;
+ }
+
+ }
+
+ public static class StateDumpResponseFunction extends Function {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ private final String stateDumpId;
+
+ private final String state;
+
+ public StateDumpResponseFunction(String nodeId, String stateDumpId, String state) {
+ this.nodeId = nodeId;
+ this.stateDumpId = stateDumpId;
+ this.state = state;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getStateDumpId() {
+ return stateDumpId;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.STATE_DUMP_RESPONSE;
+ }
+ }
+
public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0a16087..b52d0ae 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -138,4 +138,11 @@
ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
}
+ @Override
+ public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
+ CCNCFunctions.StateDumpResponseFunction fn = new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId,
+ state);
+ ipcHandle.send(-1, fn, null);
+ }
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index e078a91..2484a98 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -78,4 +78,10 @@
CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
ipcHandle.send(-1, rpaf, null);
}
+
+ @Override
+ public void dumpState(String stateDumpId) throws Exception {
+ CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
+ ipcHandle.send(-1, dsf, null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f026ca1..bccbc67 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -61,6 +61,7 @@
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
@@ -78,6 +79,7 @@
import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
+import edu.uci.ics.hyracks.control.nc.work.StateDumpWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
@@ -461,7 +463,8 @@
private final class NodeControllerIPCI implements IIPCI {
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE: {
@@ -521,6 +524,12 @@
queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
return;
}
+
+ case STATE_DUMP_REQUEST: {
+ final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
+ queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+ return;
+ }
}
throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 4ef8d9a..a6c2cc4 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -15,9 +15,11 @@
package edu.uci.ics.hyracks.control.nc.application;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IStateDumpHandler;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
@@ -31,6 +33,7 @@
private final IHyracksRootContext rootCtx;
private final MemoryManager memoryManager;
private Object appObject;
+ private IStateDumpHandler sdh;
public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager) throws IOException {
@@ -39,6 +42,13 @@
this.nodeId = nodeId;
this.rootCtx = rootCtx;
this.memoryManager = memoryManager;
+ sdh = new IStateDumpHandler() {
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ lccm.dumpState(os);
+ }
+ };
}
@Override
@@ -56,6 +66,15 @@
}
@Override
+ public void setStateDumpHandler(IStateDumpHandler handler) {
+ this.sdh = handler;
+ }
+
+ public IStateDumpHandler getStateDumpHandler() {
+ return sdh;
+ }
+
+ @Override
public IHyracksRootContext getRootContext() {
return rootCtx;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java
new file mode 100644
index 0000000..0adea86
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.ByteArrayOutputStream;
+
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class StateDumpWork extends SynchronizableWork {
+ private final NodeControllerService ncs;
+
+ private final String stateDumpId;
+
+ public StateDumpWork(NodeControllerService ncs, String stateDumpId) {
+ this.ncs = ncs;
+ this.stateDumpId = stateDumpId;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ncs.getApplicationContext().getStateDumpHandler().dumpState(baos);
+ ncs.getClusterController().notifyStateDump(ncs.getApplicationContext().getNodeId(), stateDumpId,
+ baos.toString("UTF-8"));
+ }
+}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 052ea67..8543433 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -215,7 +215,7 @@
}
}
- private void dumpState(OutputStream os) throws IOException {
+ public void dumpState(OutputStream os) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append(String.format("Memory budget = %d\n", memoryBudget));
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 4162dc2..03d57f5 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -802,8 +802,13 @@
@Override
public void stop(boolean dumpState, OutputStream os) throws IOException {
if (dumpState) {
- os.write(dumpState().getBytes());
+ dumpState(os);
}
close();
}
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ os.write(dumpState().getBytes());
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index d518122..dd2455e 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -18,6 +18,7 @@
import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IStateDumpHandler;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -89,8 +90,6 @@
@Override
public void setMessageBroker(IMessageBroker staticticsConnector) {
- // TODO Auto-generated method stub
-
}
@Override
@@ -100,7 +99,6 @@
@Override
public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
- // TODO Auto-generated method stub
return null;
}
@@ -111,18 +109,19 @@
@Override
public ThreadFactory getThreadFactory() {
- // TODO Auto-generated method stub
return null;
}
@Override
public void setThreadFactory(ThreadFactory threadFactory) {
- // TODO Auto-generated method stub
-
}
@Override
public ILifeCycleComponentManager getLifeCycleComponentManager() {
return lccm;
}
+
+ @Override
+ public void setStateDumpHandler(IStateDumpHandler handler) {
+ }
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
index 5484f78..0c97230 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
@@ -178,7 +178,7 @@
}
}
- private void dumpState(OutputStream os) throws IOException {
+ public void dumpState(OutputStream os) throws IOException {
StringBuilder sb = new StringBuilder();
String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";