Add ability to configure network ports for NCs and CC.
This includes the ability to specify different "public" ports, for situations
when a node is behind a NAT firewall.
Also eliminate IP check for data and dataset network addresses, which
should allow DNS names to be used as well.
Some internal cleanup regarding handling network addresses.
Change-Id: I9947fe3cec59daef3458cdc14d33c9353449da27
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/204
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Ian Maxon <imaxon@uci.edu>
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index fd59278..b97053a 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -46,7 +46,6 @@
ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_NET_PORT;
ccConfig.clusterNetIpAddress = "127.0.0.1";
ccConfig.clusterNetPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
- // ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -55,7 +54,7 @@
ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -65,7 +64,7 @@
ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
+ ncConfig2.resultIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 3bb7d22..0f1d2ab 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -18,13 +18,17 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Arrays;
import edu.uci.ics.hyracks.api.io.IWritable;
public final class NetworkAddress implements IWritable, Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ private String address;
+ // Cached locally, not serialized
private byte[] ipAddress;
private int port;
@@ -36,15 +40,24 @@
}
private NetworkAddress() {
-
+ ipAddress = null;
}
- public NetworkAddress(byte[] ipAddress, int port) {
- this.ipAddress = ipAddress;
+ public NetworkAddress(String address, int port) {
+ this.address = address;
this.port = port;
+ ipAddress = null;
}
- public byte[] getIpAddress() {
+ public String getAddress() {
+ return address;
+ }
+
+ public byte[] lookupIpAddress() throws UnknownHostException {
+ if (ipAddress == null) {
+ InetAddress addr = InetAddress.getByName(address);
+ ipAddress = addr.getAddress();
+ }
return ipAddress;
}
@@ -54,12 +67,12 @@
@Override
public String toString() {
- return Arrays.toString(ipAddress) + ":" + port;
+ return address + ":" + port;
}
@Override
public int hashCode() {
- return Arrays.hashCode(ipAddress) + port;
+ return address.hashCode() + port;
}
@Override
@@ -68,21 +81,18 @@
return false;
}
NetworkAddress on = (NetworkAddress) o;
- return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
+ return on.port == port && on.address == address;
}
@Override
public void writeFields(DataOutput output) throws IOException {
- output.writeInt(ipAddress.length);
- output.write(ipAddress);
+ output.writeUTF(address);
output.writeInt(port);
}
@Override
public void readFields(DataInput input) throws IOException {
- int size = input.readInt();
- ipAddress = new byte[size];
- input.readFully(ipAddress);
+ address = input.readUTF();
port = input.readInt();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
index 86d5d1b..513357b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.api.context;
+import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
@@ -23,7 +24,7 @@
public interface ICCContext {
public ClusterControllerInfo getClusterControllerInfo();
- public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+ public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception;
public ClusterTopology getClusterTopology();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index e74f844..46226ba 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -33,8 +33,8 @@
public HyracksDataset(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception {
NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
- datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
- ddsAddress.getIpAddress()), ddsAddress.getPort());
+ datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection
+ (ddsAddress.getAddress(), ddsAddress.getPort());
netManager = new ClientNetworkManager(nReaders);
netManager.start();
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 187358c..062b5bf 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -174,7 +174,7 @@
private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
NetworkAddress netAddr = addr.getNetworkAddress();
- return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
+ return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
}
private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
index a58aa9c..73ca5da 100644
--- a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
@@ -65,7 +65,7 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -75,7 +75,7 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
+ ncConfig2.resultIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index c6ea57e..8ec3669 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.io.FileReader;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -113,7 +114,7 @@
private final Map<String, NodeControllerState> nodeRegistry;
- private final Map<String, Set<String>> ipAddressNodeNameMap;
+ private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
private final ServerContext serverCtx;
@@ -154,7 +155,7 @@
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
jobLog = new LogFile(jobLogFolder);
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
- ipAddressNodeNameMap = new HashMap<String, Set<String>>();
+ ipAddressNodeNameMap = new HashMap<InetAddress, Set<String>>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
IIPCI ccIPCI = new ClusterControllerIPCI();
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -185,7 +186,7 @@
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ICCContext() {
@Override
- public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception {
+ public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception {
GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
workQueue.scheduleAndSync(ginmw);
}
@@ -286,7 +287,7 @@
return runMapHistory;
}
- public Map<String, Set<String>> getIpAddressNodeNameMap() {
+ public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
return ipAddressNodeNameMap;
}
@@ -331,7 +332,7 @@
}
public NetworkAddress getDatasetDirectoryServiceInfo() {
- return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
+ return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
}
private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
index e194338..029ab78 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
@@ -22,9 +23,9 @@
public class GetIpAddressNodeNameMapWork extends SynchronizableWork {
private final ClusterControllerService ccs;
- private Map<String, Set<String>> map;
+ private Map<InetAddress, Set<String>> map;
- public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<String, Set<String>> map) {
+ public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<InetAddress, Set<String>> map) {
this.ccs = ccs;
this.map = map;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index acaf324..084a430 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.net.InetAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -58,14 +59,19 @@
throw new Exception("Node with this name already registered.");
}
nodeMap.put(id, state);
- Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+ Map<InetAddress, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+ // QQQ Breach of encapsulation here - way too much duplicated data
+ // in NodeRegistration
String ipAddress = state.getNCConfig().dataIPAddress;
+ if (state.getNCConfig().dataPublicIPAddress != null) {
+ ipAddress = state.getNCConfig().dataPublicIPAddress;
+ }
ncConfiguration = new HashMap<String, String>();
state.getNCConfig().toMap(ncConfiguration);
Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
if (nodes == null) {
nodes = new HashSet<String>();
- ipAddressNodeNameMap.put(ipAddress, nodes);
+ ipAddressNodeNameMap.put(InetAddress.getByName(ipAddress), nodes);
}
nodes.add(id);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -81,4 +87,4 @@
ncIPCHandle.send(-1, result, null);
ccs.getApplicationContext().notifyNodeJoin(id, ncConfiguration);
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index f6c1e54..21da850 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -34,7 +34,7 @@
@Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
public int clusterNetPort = 1099;
- @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
+ @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)")
public int httpPort = 16001;
@Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 74e9710..5fa1859 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -28,20 +28,47 @@
@Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
public String ccHost;
- @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+ @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false)
public int ccPort = 1099;
@Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
public String clusterNetIPAddress;
+ @Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)", required = false)
+ public int clusterNetPort = 0;
+
+ @Option(name = "-cluster-net-public-ip-address", usage = "Public IP Address to announce cluster listener (default: same as -cluster-net-ip-address)", required = false)
+ public String clusterNetPublicIPAddress;
+
+ @Option(name = "-cluster-net-public-port", usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; must set -cluser-net-public-ip-address also)", required = false)
+ public int clusterNetPublicPort = 0;
+
@Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
public String nodeId;
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
+ @Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false)
+ public int dataPort = 0;
+
+ @Option(name = "-data-public-ip-address", usage = "Public IP Address to announce data listener (default: same as -data-ip-address)", required = false)
+ public String dataPublicIPAddress;
+
+ @Option(name = "-data-public-port", usage = "Public IP port to announce data listener (default: same as -data-port; must set -data-public-ip-address also)", required = false)
+ public int dataPublicPort = 0;
+
@Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
- public String datasetIPAddress;
+ public String resultIPAddress;
+
+ @Option(name = "-result-port", usage = "IP port to bind dataset result distribution listener (default: random port)", required = false)
+ public int resultPort = 0;
+
+ @Option(name = "-result-public-ip-address", usage = "Public IP Address to announce dataset result distribution listener (default: same as -result-ip-address)", required = false)
+ public String resultPublicIPAddress;
+
+ @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener (default: same as -result-port; must set -result-public-ip-address also)", required = false)
+ public int resultPublicPort = 0;
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -78,11 +105,30 @@
cList.add(String.valueOf(ccPort));
cList.add("-cluster-net-ip-address");
cList.add(clusterNetIPAddress);
+ cList.add("-cluster-net-port");
+ cList.add(String.valueOf(clusterNetPort));
+ cList.add("-cluster-net-public-ip-address");
+ cList.add(clusterNetPublicIPAddress);
+ cList.add("-cluster-net-public-port");
+ cList.add(String.valueOf(clusterNetPublicPort));
cList.add("-node-id");
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
- cList.add(datasetIPAddress);
+ cList.add("-data-port");
+ cList.add(String.valueOf(dataPort));
+ cList.add("-data-public-ip-address");
+ cList.add(dataPublicIPAddress);
+ cList.add("-data-public-port");
+ cList.add(String.valueOf(dataPublicPort));
+ cList.add("-result-ip-address");
+ cList.add(resultIPAddress);
+ cList.add("-result-port");
+ cList.add(String.valueOf(resultPort));
+ cList.add("-result-public-ip-address");
+ cList.add(resultPublicIPAddress);
+ cList.add("-result-public-port");
+ cList.add(String.valueOf(resultPublicPort));
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-net-thread-count");
@@ -114,8 +160,18 @@
configuration.put("cc-host", ccHost);
configuration.put("cc-port", (String.valueOf(ccPort)));
configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+ configuration.put("cluster-net-port", String.valueOf(clusterNetPort));
+ configuration.put("cluster-net-public-ip-address", clusterNetPublicIPAddress);
+ configuration.put("cluster-net-public-port", String.valueOf(clusterNetPublicPort));
configuration.put("node-id", nodeId);
configuration.put("data-ip-address", dataIPAddress);
+ configuration.put("data-port", String.valueOf(dataPort));
+ configuration.put("data-public-ip-address", dataPublicIPAddress);
+ configuration.put("data-public-port", String.valueOf(dataPublicPort));
+ configuration.put("result-ip-address", resultIPAddress);
+ configuration.put("result-port", String.valueOf(resultPort));
+ configuration.put("result-public-ip-address", resultPublicIPAddress);
+ configuration.put("result-public-port", String.valueOf(resultPublicPort));
configuration.put("iodevices", ioDevices);
configuration.put("net-thread-count", String.valueOf(nNetThreads));
configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index a1eb22e..365cf2e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1296,18 +1296,14 @@
}
private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
- int bLen = dis.readInt();
- byte[] ipAddress = new byte[bLen];
- dis.read(ipAddress);
+ String address = dis.readUTF();
int port = dis.readInt();
- NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+ NetworkAddress networkAddress = new NetworkAddress(address, port);
return networkAddress;
}
private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
- byte[] ipAddress = networkAddress.getIpAddress();
- dos.writeInt(ipAddress.length);
- dos.write(ipAddress);
+ dos.writeUTF(networkAddress.getAddress());
dos.writeInt(networkAddress.getPort());
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index e9f55fb..02cdd70 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,6 +45,7 @@
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -161,7 +162,7 @@
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
NodeControllerIPCI ipci = new NodeControllerIPCI();
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
new CCNCFunctions.SerializerDeserializer());
this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
@@ -169,8 +170,8 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads,
- ncConfig.nNetBuffers);
+ netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads,
+ ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
lccm = new LifeCycleComponentManager();
queue = new WorkQueue();
@@ -242,10 +243,11 @@
private void init() throws Exception {
ctx.getIOManager().setExecutor(executor);
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultTTL, ncConfig.resultSweepThreshold);
- datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers);
+ datasetPartitionManager = new DatasetPartitionManager
+ (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+ datasetNetworkManager = new DatasetNetworkManager
+ (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager,
+ ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort);
}
@Override
@@ -265,8 +267,14 @@
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+ // Use "public" versions of network addresses and ports
+ NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+ NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+ if (ncConfig.dataPublicIPAddress != null) {
+ netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+ }
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
+ datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
.getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
@@ -378,22 +386,6 @@
return queue;
}
- private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
- ipaddrStr = ipaddrStr.trim();
- Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
- Matcher m = pattern.matcher(ipaddrStr);
- if (!m.matches()) {
- throw new Exception(MessageFormat.format(
- "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
- }
- byte[] ipBytes = new byte[4];
- ipBytes[0] = (byte) Integer.parseInt(m.group(1));
- ipBytes[1] = (byte) Integer.parseInt(m.group(2));
- ipBytes[2] = (byte) Integer.parseInt(m.group(3));
- ipBytes[3] = (byte) Integer.parseInt(m.group(4));
- return InetAddress.getByAddress(ipBytes);
- }
-
private class HeartbeatTask extends TimerTask {
private IClusterController cc;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b1f4147..190fd28 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -97,8 +97,9 @@
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
boolean orderedResult, boolean emptyResult) throws HyracksException {
try {
+ // Be sure to send the *public* network address to the CC
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
- partition, nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
throw new HyracksException(e);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 348a37c5..784c177 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -50,24 +49,53 @@
private final int nBuffers;
- private NetworkAddress networkAddress;
+ private NetworkAddress localNetworkAddress;
- public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads,
- int nBuffers) throws IOException {
+ private NetworkAddress publicNetworkAddress;
+
+ /**
+ * @param inetAddress - Internet address to bind the listen port to
+ * @param inetPort - Port to bind on inetAddress
+ * @param publicInetAddress - Internet address to report to consumers;
+ * useful when behind NAT. null = same as inetAddress
+ * @param publicInetPort - Port to report to consumers; useful when
+ * behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
+ */
+ public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager, int nThreads,
+ int nBuffers, String publicInetAddress, int publicInetPort) throws IOException {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
- md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+ md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
MAX_CONNECTION_ATTEMPTS);
+ // Just save these values for the moment; may be reset in start()
+ publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
}
public void start() throws IOException {
md.start();
InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+ // See if the public address was explicitly specified, and if not,
+ // make it a copy of localNetworkAddress
+ if (publicNetworkAddress.getAddress() == null) {
+ publicNetworkAddress = localNetworkAddress;
+ }
+ else {
+ // Likewise for public port
+ if (publicNetworkAddress.getPort() == 0) {
+ publicNetworkAddress = new NetworkAddress
+ (publicNetworkAddress.getAddress(), sockAddr.getPort());
+ }
+ }
}
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
+ public NetworkAddress getLocalNetworkAddress() {
+ return localNetworkAddress;
+ }
+
+ public NetworkAddress getPublicNetworkAddress() {
+ return publicNetworkAddress;
}
public void stop() {
@@ -129,4 +157,4 @@
public MuxDemuxPerformanceCounters getPerformanceCounters() {
return md.getPerformanceCounters();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 8791aa1..466d6f2 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -51,24 +50,46 @@
private final MuxDemux md;
- private NetworkAddress networkAddress;
+ private NetworkAddress localNetworkAddress;
- public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads, int nBuffers)
+ private NetworkAddress publicNetworkAddress;
+
+ public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads, int nBuffers,
+ String publicInetAddress, int publicInetPort)
throws IOException {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
- md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+ md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
MAX_CONNECTION_ATTEMPTS);
+ // Just save these values for the moment; may be reset in start()
+ publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
}
public void start() throws IOException {
md.start();
InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+ // See if the public address was explicitly specified, and if not,
+ // make it a copy of localNetworkAddress
+ if (publicNetworkAddress.getAddress() == null) {
+ publicNetworkAddress = localNetworkAddress;
+ }
+ else {
+ // Likewise for public port
+ if (publicNetworkAddress.getPort() == 0) {
+ publicNetworkAddress = new NetworkAddress
+ (publicNetworkAddress.getAddress(), sockAddr.getPort());
+ }
+ }
}
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
+ public NetworkAddress getLocalNetworkAddress() {
+ return localNetworkAddress;
+ }
+
+ public NetworkAddress getPublicNetworkAddress() {
+ return publicNetworkAddress;
}
public void stop() {
@@ -136,4 +157,4 @@
public MuxDemuxPerformanceCounters getPerformanceCounters() {
return md.getPerformanceCounters();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index f70d325..206ff67 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -47,7 +47,7 @@
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
- new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+ new InetSocketAddress(InetAddress.getByAddress(networkAddress.lookupIpAddress()),
networkAddress.getPort()), pid, 5));
ji.reportPartitionAvailability(channel);
}
@@ -55,4 +55,4 @@
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 1be5fc6..adea6bb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -264,7 +264,7 @@
.getTaskAttemptId().getTaskId().getPartition());
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
- .getIpAddress()), networkAddress.getPort()), pid, 5));
+ .lookupIpAddress()), networkAddress.getPort()), pid, 5));
channels.add(channel);
}
}
@@ -273,4 +273,4 @@
}
return channelsForInputConnectors;
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 7356644..1150cf3 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -94,7 +94,7 @@
ncConfig1.ccPort = 39001;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -104,7 +104,7 @@
ncConfig2.ccPort = 39001;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
+ ncConfig2.resultIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index ddaa7cf..970f2fe 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -93,7 +93,7 @@
ncConfig.ccPort = 39001;
ncConfig.clusterNetIPAddress = "127.0.0.1";
ncConfig.dataIPAddress = "127.0.0.1";
- ncConfig.datasetIPAddress = "127.0.0.1";
+ ncConfig.resultIPAddress = "127.0.0.1";
ncConfig.nodeId = ASTERIX_IDS[i];
asterixNCs[i] = new NodeControllerService(ncConfig);
asterixNCs[i].start();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
index 1f3a167..a6324ad 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.hdfs.scheduler;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -38,7 +39,15 @@
final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
for (int i = 0; i < workloads.length; i++) {
if (workloads[i] < slotLimit) {
- BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+ byte[] rawip;
+ try {
+ rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
+ }
+ catch (UnknownHostException e) {
+ // QQQ Should probably have a neater solution than this
+ throw new RuntimeException(e);
+ }
+ BytesWritable ip = new BytesWritable(rawip);
IntWritable availableSlot = availableIpsToSlots.get(ip);
if (availableSlot == null) {
availableSlot = new IntWritable(slotLimit - workloads[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 8c5c78a..cb7de30 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -50,7 +50,7 @@
for (int i = 0; i < NCs.length; i++) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
- ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+ ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
if (path.size() <= 0) {
// if the hyracks nc is not in the defined cluster
@@ -87,7 +87,7 @@
if (workloads[i] < slotLimit) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
- ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+ ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
if (path.size() <= 0) {
// if the hyracks nc is not in the defined cluster
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 608b76f..167d02b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -379,7 +379,7 @@
* build the IP address to NC map
*/
for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
- String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
.getHostAddress();
List<String> matchedNCs = ipToNcMapping.get(ipAddr);
if (matchedNCs == null) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index ba12d9c..78f2703 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -57,24 +57,12 @@
*/
public void testSchedulerSimple() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
InputSplit[] fileSplits = new InputSplit[6];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -107,24 +95,12 @@
*/
public void testSchedulerLargerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new NetworkAddress("10.0.0.5", 5098)));
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -166,24 +142,12 @@
*/
public void testSchedulerSmallerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -224,24 +188,12 @@
*/
public void testSchedulerSmallerHDFSOdd() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
InputSplit[] fileSplits = new InputSplit[13];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -284,24 +236,12 @@
*/
public void testSchedulercBoundary() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
/** test empty file splits */
InputSplit[] fileSplits = new InputSplit[0];
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index 4f1fecf..1d0c280 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -64,7 +64,7 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -74,7 +74,7 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
+ ncConfig2.resultIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 74f1b43..2bea1f7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -44,24 +44,12 @@
*/
public void testSchedulerSimple() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -88,24 +76,12 @@
*/
public void testSchedulerLargerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -139,24 +115,12 @@
*/
public void testSchedulerSmallerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -190,24 +154,12 @@
*/
public void testSchedulerSmallerHDFSOdd() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 28b49ab..d2c9b90 100644
--- a/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -89,4 +89,4 @@
return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
new JavaSerializationBasedPayloadSerializerDeserializer());
}
-}
\ No newline at end of file
+}