Merged hyracks_asterix_vldb_demo -r 1860:1862
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1971 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
index a94c6de..c4989c5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.hyracks.api.context;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.io.IIOManager;
public interface IHyracksRootContext {
public IIOManager getIOManager();
+
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 32b031d..5a33891 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -70,6 +70,7 @@
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
@@ -352,7 +353,8 @@
private class ClusterControllerIPCI implements IIPCI {
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
CCNCFunctions.Function fn = (Function) payload;
switch (fn.getFunctionId()) {
case REGISTER_NODE: {
@@ -425,6 +427,23 @@
.getAppName(), rsf.getNodeId()));
return;
}
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
+ new IResultCallback<Map<String, NodeControllerInfo>>() {
+ @Override
+ public void setValue(Map<String, NodeControllerInfo> result) {
+ new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+ .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+ }
+
+ @Override
+ public void setException(Exception e) {
+
+ }
+ }));
+ return;
+ }
}
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a25250a..0c5bb2f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -50,4 +49,6 @@
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
+
+ public void getNodeControllerInfos() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 8f0056f..557a8cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -28,6 +28,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -77,6 +78,8 @@
DESTROY_APPLICATION,
REPORT_PARTITION_AVAILABILITY,
SEND_APPLICATION_MESSAGE,
+ GET_NODE_CONTROLLERS_INFO,
+ GET_NODE_CONTROLLERS_INFO_RESPONSE,
OTHER
}
@@ -643,6 +646,34 @@
}
}
+ public static class GetNodeControllersInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO;
+ }
+ }
+
+ public static class GetNodeControllersInfoResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, NodeControllerInfo> ncInfos;
+
+ public GetNodeControllersInfoResponseFunction(Map<String, NodeControllerInfo> ncInfos) {
+ this.ncInfos = ncInfos;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO_RESPONSE;
+ }
+
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() {
+ return ncInfos;
+ }
+ }
+
public static class ReportPartitionAvailabilityFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index d789768..bbaab4e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -103,7 +103,13 @@
@Override
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
- CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+ CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+ appName, nodeId);
ipcHandle.send(-1, fn, null);
}
+
+ @Override
+ public void getNodeControllerInfos() throws Exception {
+ ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 58a173c..0195143 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -40,6 +40,10 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -122,6 +126,8 @@
private final OperatingSystemMXBean osMXBean;
+ private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
+
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
@@ -129,7 +135,7 @@
NodeControllerIPCI ipci = new NodeControllerIPCI();
ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
new CCNCFunctions.SerializerDeserializer());
- this.ctx = new RootHyracksContext(new IOManager(getDevices(ncConfig.ioDevices), executor));
+ this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
}
@@ -148,6 +154,7 @@
runtimeMXBean = ManagementFactory.getRuntimeMXBean();
osMXBean = ManagementFactory.getOperatingSystemMXBean();
registrationPending = true;
+ getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
}
public IHyracksRootContext getRootContext() {
@@ -171,6 +178,28 @@
notifyAll();
}
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+ synchronized (getNodeControllerInfosAcceptor) {
+ while (getNodeControllerInfosAcceptor.getValue() != null) {
+ getNodeControllerInfosAcceptor.wait();
+ }
+ getNodeControllerInfosAcceptor.setValue(fv);
+ }
+ ccs.getNodeControllerInfos();
+ return fv.get();
+ }
+
+ private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+ FutureValue<Map<String, NodeControllerInfo>> fv;
+ synchronized (getNodeControllerInfosAcceptor) {
+ fv = getNodeControllerInfosAcceptor.getValue();
+ getNodeControllerInfosAcceptor.setValue(null);
+ getNodeControllerInfosAcceptor.notifyAll();
+ }
+ fv.setValue(ncInfos);
+ }
+
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -418,6 +447,12 @@
setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
return;
}
+
+ case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
+ CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+ setNodeControllersInfo(gncirf.getNodeControllerInfos());
+ return;
+ }
}
throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 9e42277..4651149 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,13 +14,20 @@
*/
package edu.uci.ics.hyracks.control.nc.runtime;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
public class RootHyracksContext implements IHyracksRootContext {
+ private final NodeControllerService ncs;
+
private final IIOManager ioManager;
- public RootHyracksContext(IIOManager ioManager) {
+ public RootHyracksContext(NodeControllerService ncs, IIOManager ioManager) {
+ this.ncs = ncs;
this.ioManager = ioManager;
}
@@ -28,4 +35,9 @@
public IIOManager getIOManager() {
return ioManager;
}
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return ncs.getNodeControllersInfo();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 927d21c..6a9dab7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -48,6 +48,8 @@
case '-':
sign = -1;
+ pre = false;
+ break;
case '0':
case '1':
@@ -60,6 +62,7 @@
case '8':
case '9':
pre = false;
+ n = n * 10 + (ch - '0');
break;
default:
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
index e195036..2fe5ded 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
@@ -17,8 +17,10 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IIOManager;
@@ -38,4 +40,9 @@
public IIOManager getIOManager() {
return ioManager;
}
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return null;
+ }
}
\ No newline at end of file