Create a separate datasetNetworkManager with its own socket (ipaddress and port)
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2514 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 167eb4b..9b34810 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,6 +37,9 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ public String datasetIPAddress;
+
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -66,6 +69,7 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
+ cList.add(datasetIPAddress);
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-dcache-client-servers");
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index 91cfecf..a897602 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,6 +33,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final String osName;
private final String arch;
@@ -60,13 +62,14 @@
private final HeartbeatSchema hbSchema;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
- String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
- Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
+ String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
+ List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
+ this.datasetPort = datasetPort;
this.osName = osName;
this.arch = arch;
this.osVersion = osVersion;
@@ -98,6 +101,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public String getOSName() {
return osName;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d6af71b..3a41e48 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -64,6 +64,7 @@
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -98,6 +99,8 @@
private final IDatasetPartitionManager datasetPartitionManager;
+ private final DatasetNetworkManager datasetNetworkManager;
+
private final WorkQueue queue;
private final Timer timer;
@@ -144,8 +147,11 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
- datasetPartitionManager = new DatasetPartitionManager(this);
+ netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+
+ datasetPartitionManager = new DatasetPartitionManager(this, executor);
+ datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress), datasetPartitionManager,
+ ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -210,6 +216,7 @@
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
netManager.start();
+ datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -218,10 +225,11 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
- .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
- runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+ datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+ .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
+ .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
+ .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+ runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -252,8 +260,10 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
partitionManager.close();
+ datasetPartitionManager.close();
heartbeatTask.cancel();
netManager.stop();
+ datasetNetworkManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -278,6 +288,10 @@
return netManager;
}
+ public DatasetNetworkManager getDatasetNetworkManager() {
+ return datasetNetworkManager;
+ }
+
public PartitionManager getPartitionManager() {
return partitionManager;
}
@@ -302,8 +316,7 @@
return queue;
}
- private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
- String ipaddrStr = ncConfig.dataIPAddress;
+ private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
ipaddrStr = ipaddrStr.trim();
Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
Matcher m = pattern.matcher(ipaddrStr);