Add ability to configure network ports for NCs and CC.

This includes the ability to specify different "public" ports, for situations
when a node is behind a NAT firewall.
Also eliminate IP check for data and dataset network addresses, which
should allow DNS names to be used as well.
Some internal cleanup regarding handling network addresses.

Change-Id: I9947fe3cec59daef3458cdc14d33c9353449da27
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/204
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Ian Maxon <imaxon@uci.edu>
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index fd59278..b97053a 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -46,7 +46,6 @@
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_NET_PORT;
         ccConfig.clusterNetIpAddress = "127.0.0.1";
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
-        // ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
@@ -55,7 +54,7 @@
         ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -65,7 +64,7 @@
         ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
-        ncConfig2.datasetIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 3bb7d22..0f1d2ab 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -18,13 +18,17 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.io.IWritable;
 
 public final class NetworkAddress implements IWritable, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
+    private String address;
+    // Cached locally, not serialized
     private byte[] ipAddress;
 
     private int port;
@@ -36,15 +40,24 @@
     }
 
     private NetworkAddress() {
-
+        ipAddress = null;
     }
 
-    public NetworkAddress(byte[] ipAddress, int port) {
-        this.ipAddress = ipAddress;
+    public NetworkAddress(String address, int port) {
+        this.address = address;
         this.port = port;
+        ipAddress = null;
     }
 
-    public byte[] getIpAddress() {
+    public String getAddress() {
+        return address;
+    }
+
+    public byte[] lookupIpAddress() throws UnknownHostException {
+        if (ipAddress == null) {
+            InetAddress addr = InetAddress.getByName(address);
+            ipAddress = addr.getAddress();
+        }
         return ipAddress;
     }
 
@@ -54,12 +67,12 @@
 
     @Override
     public String toString() {
-        return Arrays.toString(ipAddress) + ":" + port;
+        return address + ":" + port;
     }
 
     @Override
     public int hashCode() {
-        return Arrays.hashCode(ipAddress) + port;
+        return address.hashCode() + port;
     }
 
     @Override
@@ -68,21 +81,18 @@
             return false;
         }
         NetworkAddress on = (NetworkAddress) o;
-        return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
+        return on.port == port && on.address == address;
     }
 
     @Override
     public void writeFields(DataOutput output) throws IOException {
-        output.writeInt(ipAddress.length);
-        output.write(ipAddress);
+        output.writeUTF(address);
         output.writeInt(port);
     }
 
     @Override
     public void readFields(DataInput input) throws IOException {
-        int size = input.readInt();
-        ipAddress = new byte[size];
-        input.readFully(ipAddress);
+        address = input.readUTF();
         port = input.readInt();
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
index 86d5d1b..513357b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
+import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 
@@ -23,7 +24,7 @@
 public interface ICCContext {
     public ClusterControllerInfo getClusterControllerInfo();
 
-    public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+    public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception;
 
     public ClusterTopology getClusterTopology();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index e74f844..46226ba 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -33,8 +33,8 @@
 
     public HyracksDataset(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception {
         NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
-        datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
-                ddsAddress.getIpAddress()), ddsAddress.getPort());
+        datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection
+            (ddsAddress.getAddress(), ddsAddress.getPort());
 
         netManager = new ClientNetworkManager(nReaders);
         netManager.start();
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 187358c..062b5bf 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -174,7 +174,7 @@
 
     private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
         NetworkAddress netAddr = addr.getNetworkAddress();
-        return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
     }
 
     private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
index a58aa9c..73ca5da 100644
--- a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
@@ -65,7 +65,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -75,7 +75,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
-        ncConfig2.datasetIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index c6ea57e..8ec3669 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -16,6 +16,7 @@
 
 import java.io.File;
 import java.io.FileReader;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -113,7 +114,7 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
-    private final Map<String, Set<String>> ipAddressNodeNameMap;
+    private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
 
     private final ServerContext serverCtx;
 
@@ -154,7 +155,7 @@
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
-        ipAddressNodeNameMap = new HashMap<String, Set<String>>();
+        ipAddressNodeNameMap = new HashMap<InetAddress, Set<String>>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         IIPCI ccIPCI = new ClusterControllerIPCI();
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -185,7 +186,7 @@
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ICCContext() {
             @Override
-            public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception {
+            public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception {
                 GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
                 workQueue.scheduleAndSync(ginmw);
             }
@@ -286,7 +287,7 @@
         return runMapHistory;
     }
 
-    public Map<String, Set<String>> getIpAddressNodeNameMap() {
+    public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
         return ipAddressNodeNameMap;
     }
 
@@ -331,7 +332,7 @@
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() {
-        return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
+        return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
     }
 
     private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
index e194338..029ab78 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 
@@ -22,9 +23,9 @@
 
 public class GetIpAddressNodeNameMapWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private Map<String, Set<String>> map;
+    private Map<InetAddress, Set<String>> map;
 
-    public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<String, Set<String>> map) {
+    public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<InetAddress, Set<String>> map) {
         this.ccs = ccs;
         this.map = map;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index acaf324..084a430 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -58,14 +59,19 @@
                 throw new Exception("Node with this name already registered.");
             }
             nodeMap.put(id, state);
-            Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+            Map<InetAddress, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+            // QQQ Breach of encapsulation here - way too much duplicated data
+            // in NodeRegistration
             String ipAddress = state.getNCConfig().dataIPAddress;
+            if (state.getNCConfig().dataPublicIPAddress != null) {
+                ipAddress = state.getNCConfig().dataPublicIPAddress;
+            }
             ncConfiguration = new HashMap<String, String>();
             state.getNCConfig().toMap(ncConfiguration);
             Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
             if (nodes == null) {
                 nodes = new HashSet<String>();
-                ipAddressNodeNameMap.put(ipAddress, nodes);
+                ipAddressNodeNameMap.put(InetAddress.getByName(ipAddress), nodes);
             }
             nodes.add(id);
             LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -81,4 +87,4 @@
         ncIPCHandle.send(-1, result, null);
         ccs.getApplicationContext().notifyNodeJoin(id, ncConfiguration);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index f6c1e54..21da850 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -34,7 +34,7 @@
     @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
     public int clusterNetPort = 1099;
 
-    @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
+    @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)")
     public int httpPort = 16001;
 
     @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 74e9710..5fa1859 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -28,20 +28,47 @@
     @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
     public String ccHost;
 
-    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false)
     public int ccPort = 1099;
 
     @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
     public String clusterNetIPAddress;
 
+    @Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)", required = false)
+    public int clusterNetPort = 0;
+
+    @Option(name = "-cluster-net-public-ip-address", usage = "Public IP Address to announce cluster listener (default: same as -cluster-net-ip-address)", required = false)
+    public String clusterNetPublicIPAddress;
+
+    @Option(name = "-cluster-net-public-port", usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; must set -cluser-net-public-ip-address also)", required = false)
+    public int clusterNetPublicPort = 0;
+
     @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
     public String nodeId;
 
     @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
     public String dataIPAddress;
 
+    @Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false)
+    public int dataPort = 0;
+
+    @Option(name = "-data-public-ip-address", usage = "Public IP Address to announce data listener (default: same as -data-ip-address)", required = false)
+    public String dataPublicIPAddress;
+
+    @Option(name = "-data-public-port", usage = "Public IP port to announce data listener (default: same as -data-port; must set -data-public-ip-address also)", required = false)
+    public int dataPublicPort = 0;
+
     @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
-    public String datasetIPAddress;
+    public String resultIPAddress;
+
+    @Option(name = "-result-port", usage = "IP port to bind dataset result distribution listener (default: random port)", required = false)
+    public int resultPort = 0;
+
+    @Option(name = "-result-public-ip-address", usage = "Public IP Address to announce dataset result distribution listener (default: same as -result-ip-address)", required = false)
+    public String resultPublicIPAddress;
+
+    @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener (default: same as -result-port; must set -result-public-ip-address also)", required = false)
+    public int resultPublicPort = 0;
 
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
     public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -78,11 +105,30 @@
         cList.add(String.valueOf(ccPort));
         cList.add("-cluster-net-ip-address");
         cList.add(clusterNetIPAddress);
+        cList.add("-cluster-net-port");
+        cList.add(String.valueOf(clusterNetPort));
+        cList.add("-cluster-net-public-ip-address");
+        cList.add(clusterNetPublicIPAddress);
+        cList.add("-cluster-net-public-port");
+        cList.add(String.valueOf(clusterNetPublicPort));
         cList.add("-node-id");
         cList.add(nodeId);
         cList.add("-data-ip-address");
         cList.add(dataIPAddress);
-        cList.add(datasetIPAddress);
+        cList.add("-data-port");
+        cList.add(String.valueOf(dataPort));
+        cList.add("-data-public-ip-address");
+        cList.add(dataPublicIPAddress);
+        cList.add("-data-public-port");
+        cList.add(String.valueOf(dataPublicPort));
+        cList.add("-result-ip-address");
+        cList.add(resultIPAddress);
+        cList.add("-result-port");
+        cList.add(String.valueOf(resultPort));
+        cList.add("-result-public-ip-address");
+        cList.add(resultPublicIPAddress);
+        cList.add("-result-public-port");
+        cList.add(String.valueOf(resultPublicPort));
         cList.add("-iodevices");
         cList.add(ioDevices);
         cList.add("-net-thread-count");
@@ -114,8 +160,18 @@
         configuration.put("cc-host", ccHost);
         configuration.put("cc-port", (String.valueOf(ccPort)));
         configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+        configuration.put("cluster-net-port", String.valueOf(clusterNetPort));
+        configuration.put("cluster-net-public-ip-address", clusterNetPublicIPAddress);
+        configuration.put("cluster-net-public-port", String.valueOf(clusterNetPublicPort));
         configuration.put("node-id", nodeId);
         configuration.put("data-ip-address", dataIPAddress);
+        configuration.put("data-port", String.valueOf(dataPort));
+        configuration.put("data-public-ip-address", dataPublicIPAddress);
+        configuration.put("data-public-port", String.valueOf(dataPublicPort));
+        configuration.put("result-ip-address", resultIPAddress);
+        configuration.put("result-port", String.valueOf(resultPort));
+        configuration.put("result-public-ip-address", resultPublicIPAddress);
+        configuration.put("result-public-port", String.valueOf(resultPublicPort));
         configuration.put("iodevices", ioDevices);
         configuration.put("net-thread-count", String.valueOf(nNetThreads));
         configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index a1eb22e..365cf2e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1296,18 +1296,14 @@
     }
 
     private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
-        int bLen = dis.readInt();
-        byte[] ipAddress = new byte[bLen];
-        dis.read(ipAddress);
+        String address = dis.readUTF();
         int port = dis.readInt();
-        NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+        NetworkAddress networkAddress = new NetworkAddress(address, port);
         return networkAddress;
     }
 
     private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
-        byte[] ipAddress = networkAddress.getIpAddress();
-        dos.writeInt(ipAddress.length);
-        dos.write(ipAddress);
+        dos.writeUTF(networkAddress.getAddress());
         dos.writeInt(networkAddress.getPort());
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index e9f55fb..02cdd70 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,6 +45,7 @@
 
 import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -161,7 +162,7 @@
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
         NodeControllerIPCI ipci = new NodeControllerIPCI();
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
                 new CCNCFunctions.SerializerDeserializer());
 
         this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
@@ -169,8 +170,8 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads,
-                ncConfig.nNetBuffers);
+        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads,
+                                        ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue();
@@ -242,10 +243,11 @@
 
     private void init() throws Exception {
         ctx.getIOManager().setExecutor(executor);
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
-        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
-                datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers);
+        datasetPartitionManager = new DatasetPartitionManager
+            (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+        datasetNetworkManager = new DatasetNetworkManager
+            (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager,
+             ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort);
     }
 
     @Override
@@ -265,8 +267,14 @@
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
-                datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+        // Use "public" versions of network addresses and ports
+        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        if (ncConfig.dataPublicIPAddress != null) {
+            netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+        }
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
+                datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
                         .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
                         .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
                         .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
@@ -378,22 +386,6 @@
         return queue;
     }
 
-    private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
-        ipaddrStr = ipaddrStr.trim();
-        Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
-        Matcher m = pattern.matcher(ipaddrStr);
-        if (!m.matches()) {
-            throw new Exception(MessageFormat.format(
-                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
-        }
-        byte[] ipBytes = new byte[4];
-        ipBytes[0] = (byte) Integer.parseInt(m.group(1));
-        ipBytes[1] = (byte) Integer.parseInt(m.group(2));
-        ipBytes[2] = (byte) Integer.parseInt(m.group(3));
-        ipBytes[3] = (byte) Integer.parseInt(m.group(4));
-        return InetAddress.getByAddress(ipBytes);
-    }
-
     private class HeartbeatTask extends TimerTask {
         private IClusterController cc;
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b1f4147..190fd28 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -97,8 +97,9 @@
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             boolean orderedResult, boolean emptyResult) throws HyracksException {
         try {
+            // Be sure to send the *public* network address to the CC
             ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
-                    partition, nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+                    partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
         } catch (Exception e) {
             throw new HyracksException(e);
         }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 348a37c5..784c177 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.control.nc.net;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -50,24 +49,53 @@
 
     private final int nBuffers;
 
-    private NetworkAddress networkAddress;
+    private NetworkAddress localNetworkAddress;
 
-    public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads,
-            int nBuffers) throws IOException {
+    private NetworkAddress publicNetworkAddress;
+
+    /**
+     * @param inetAddress - Internet address to bind the listen port to
+     * @param inetPort - Port to bind on inetAddress
+     * @param publicInetAddress - Internet address to report to consumers;
+     *    useful when behind NAT. null = same as inetAddress
+     * @param publicInetPort - Port to report to consumers; useful when
+     *    behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
+     */
+    public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager, int nThreads,
+                                 int nBuffers, String publicInetAddress, int publicInetPort) throws IOException {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
-        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
                 MAX_CONNECTION_ATTEMPTS);
+        // Just save these values for the moment; may be reset in start()
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
 
     public void start() throws IOException {
         md.start();
         InetSocketAddress sockAddr = md.getLocalAddress();
-        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        }
+        else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new NetworkAddress
+                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
     }
 
-    public NetworkAddress getNetworkAddress() {
-        return networkAddress;
+    public NetworkAddress getLocalNetworkAddress() {
+        return localNetworkAddress;
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
     }
 
     public void stop() {
@@ -129,4 +157,4 @@
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return md.getPerformanceCounters();
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 8791aa1..466d6f2 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.control.nc.net;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -51,24 +50,46 @@
 
     private final MuxDemux md;
 
-    private NetworkAddress networkAddress;
+    private NetworkAddress localNetworkAddress;
 
-    public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads, int nBuffers)
+    private NetworkAddress publicNetworkAddress;
+
+    public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads, int nBuffers,
+                          String publicInetAddress, int publicInetPort)
             throws IOException {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
-        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
                 MAX_CONNECTION_ATTEMPTS);
+        // Just save these values for the moment; may be reset in start()
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
 
     public void start() throws IOException {
         md.start();
         InetSocketAddress sockAddr = md.getLocalAddress();
-        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        }
+        else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new NetworkAddress
+                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
     }
 
-    public NetworkAddress getNetworkAddress() {
-        return networkAddress;
+    public NetworkAddress getLocalNetworkAddress() {
+        return localNetworkAddress;
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
     }
 
     public void stop() {
@@ -136,4 +157,4 @@
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return md.getPerformanceCounters();
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index f70d325..206ff67 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -47,7 +47,7 @@
             Joblet ji = jobletMap.get(pid.getJobId());
             if (ji != null) {
                 PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
-                        new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+                        new InetSocketAddress(InetAddress.getByAddress(networkAddress.lookupIpAddress()),
                                 networkAddress.getPort()), pid, 5));
                 ji.reportPartitionAvailability(channel);
             }
@@ -55,4 +55,4 @@
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 1be5fc6..adea6bb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -264,7 +264,7 @@
                                 .getTaskAttemptId().getTaskId().getPartition());
                         PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
                                 ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                                        .getIpAddress()), networkAddress.getPort()), pid, 5));
+                                        .lookupIpAddress()), networkAddress.getPort()), pid, 5));
                         channels.add(channel);
                     }
                 }
@@ -273,4 +273,4 @@
         }
         return channelsForInputConnectors;
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 7356644..1150cf3 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -94,7 +94,7 @@
         ncConfig1.ccPort = 39001;
         ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -104,7 +104,7 @@
         ncConfig2.ccPort = 39001;
         ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
-        ncConfig2.datasetIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index ddaa7cf..970f2fe 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -93,7 +93,7 @@
             ncConfig.ccPort = 39001;
             ncConfig.clusterNetIPAddress = "127.0.0.1";
             ncConfig.dataIPAddress = "127.0.0.1";
-            ncConfig.datasetIPAddress = "127.0.0.1";
+            ncConfig.resultIPAddress = "127.0.0.1";
             ncConfig.nodeId = ASTERIX_IDS[i];
             asterixNCs[i] = new NodeControllerService(ncConfig);
             asterixNCs[i].start();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
index 1f3a167..a6324ad 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.hdfs.scheduler;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,7 +39,15 @@
         final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
         for (int i = 0; i < workloads.length; i++) {
             if (workloads[i] < slotLimit) {
-                BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+                byte[] rawip;
+                try {
+                    rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
+                }
+                catch (UnknownHostException e) {
+                    // QQQ Should probably have a neater solution than this
+                    throw new RuntimeException(e);
+                }
+                BytesWritable ip = new BytesWritable(rawip);
                 IntWritable availableSlot = availableIpsToSlots.get(ip);
                 if (availableSlot == null) {
                     availableSlot = new IntWritable(slotLimit - workloads[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 8c5c78a..cb7de30 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -50,7 +50,7 @@
             for (int i = 0; i < NCs.length; i++) {
                 List<Integer> path = new ArrayList<Integer>();
                 String ipAddress = InetAddress.getByAddress(
-                        ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+                        ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress()).getHostAddress();
                 topology.lookupNetworkTerminal(ipAddress, path);
                 if (path.size() <= 0) {
                     // if the hyracks nc is not in the defined cluster
@@ -87,7 +87,7 @@
                 if (workloads[i] < slotLimit) {
                     List<Integer> path = new ArrayList<Integer>();
                     String ipAddress = InetAddress.getByAddress(
-                            ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+                            ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress()).getHostAddress();
                     topology.lookupNetworkTerminal(ipAddress, path);
                     if (path.size() <= 0) {
                         // if the hyracks nc is not in the defined cluster
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 608b76f..167d02b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -379,7 +379,7 @@
              * build the IP address to NC map
              */
             for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
                         .getHostAddress();
                 List<String> matchedNCs = ipToNcMapping.get(ipAddr);
                 if (matchedNCs == null) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index ba12d9c..78f2703 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -57,24 +57,12 @@
      */
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -107,24 +95,12 @@
      */
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new NetworkAddress("10.0.0.5", 5098)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -166,24 +142,12 @@
      */
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -224,24 +188,12 @@
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -284,24 +236,12 @@
      */
     public void testSchedulercBoundary() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         /** test empty file splits */
         InputSplit[] fileSplits = new InputSplit[0];
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index 4f1fecf..1d0c280 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -64,7 +64,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -74,7 +74,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
-        ncConfig2.datasetIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 74f1b43..2bea1f7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -44,24 +44,12 @@
      */
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -88,24 +76,12 @@
      */
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -139,24 +115,12 @@
      */
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -190,24 +154,12 @@
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 28b49ab..d2c9b90 100644
--- a/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -89,4 +89,4 @@
         return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
     }
-}
\ No newline at end of file
+}