Cleaned up partition registration maps. Refactored controller registration

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@453 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
index c181501..4bf3da2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCDriver.java
@@ -16,7 +16,7 @@
 
 import org.kohsuke.args4j.CmdLineParser;
 
-import edu.uci.ics.hyracks.control.common.base.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 
 public class CCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index d0561ea..56002a8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -66,12 +66,13 @@
 import edu.uci.ics.hyracks.control.cc.scheduler.IJobScheduler;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.base.CCConfig;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.base.NCConfig;
-import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
@@ -194,10 +195,12 @@
     }
 
     @Override
-    public NodeParameters registerNode(INodeController nodeController) throws Exception {
-        String id = nodeController.getId();
-        NCConfig ncConfig = nodeController.getConfiguration();
-        NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
+    public NodeParameters registerNode(NodeRegistration reg) throws Exception {
+        INodeController nodeController = reg.getNodeController();
+        String id = reg.getNodeId();
+        NCConfig ncConfig = reg.getNCConfig();
+        NetworkAddress dataPort = reg.getDataPort();
+        NodeControllerState state = new NodeControllerState(nodeController, ncConfig, dataPort);
         jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
         nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -293,8 +296,8 @@
     }
 
     @Override
-    public void registerPartitionProvider(PartitionId pid, NetworkAddress address) throws Exception {
-        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, address));
+    public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception {
+        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId));
     }
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 933504e..e0cee6d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -18,21 +18,25 @@
 import java.util.Set;
 import java.util.UUID;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.base.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 
 public class NodeControllerState {
     private final INodeController nodeController;
 
     private final NCConfig ncConfig;
 
+    private final NetworkAddress dataPort;
+
     private final Set<UUID> activeJobIds;
 
     private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
+    public NodeControllerState(INodeController nodeController, NCConfig ncConfig, NetworkAddress dataPort) {
         this.nodeController = nodeController;
         this.ncConfig = ncConfig;
+        this.dataPort = dataPort;
         activeJobIds = new HashSet<UUID>();
     }
 
@@ -59,4 +63,8 @@
     public Set<UUID> getActiveJobIds() {
         return activeJobIds;
     }
+
+    public NetworkAddress getDataPort() {
+        return dataPort;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 359bf2e..6c851cc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
 import java.util.Set;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -34,9 +33,9 @@
 
     private final JobActivityGraph jag;
 
-    private final Map<PartitionId, NetworkAddress> partitionAvailabilityMap;
+    private final Map<PartitionId, Set<String>> partitionAvailabilityMap;
 
-    private final Map<PartitionId, String> partitionRequestorMap;
+    private final Map<PartitionId, Set<String>> partitionRequestorMap;
 
     private final Set<String> participatingNodeIds;
 
@@ -53,8 +52,8 @@
     public JobRun(UUID jobId, JobActivityGraph plan) {
         this.jobId = jobId;
         this.jag = plan;
-        partitionAvailabilityMap = new HashMap<PartitionId, NetworkAddress>();
-        partitionRequestorMap = new HashMap<PartitionId, String>();
+        partitionAvailabilityMap = new HashMap<PartitionId, Set<String>>();
+        partitionRequestorMap = new HashMap<PartitionId, Set<String>>();
         participatingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
@@ -108,11 +107,11 @@
         return jsm;
     }
 
-    public Map<PartitionId, NetworkAddress> getPartitionAvailabilityMap() {
+    public Map<PartitionId, Set<String>> getPartitionAvailabilityMap() {
         return partitionAvailabilityMap;
     }
 
-    public Map<PartitionId, String> getPartitionRequestorMap() {
+    public Map<PartitionId, Set<String>> getPartitionRequestorMap() {
         return partitionRequestorMap;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 530d00a..1bcc13b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -14,9 +14,10 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -27,13 +28,12 @@
 public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final PartitionId pid;
-    private final NetworkAddress networkAddress;
+    private final String nodeId;
 
-    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid,
-            NetworkAddress networkAddress) {
+    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId) {
         this.ccs = ccs;
         this.pid = pid;
-        this.networkAddress = networkAddress;
+        this.nodeId = nodeId;
     }
 
     @Override
@@ -42,19 +42,27 @@
         if (run == null) {
             return;
         }
-        Map<PartitionId, NetworkAddress> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-        partitionAvailabilityMap.put(pid, networkAddress);
+        Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+        Set<String> paSet = partitionAvailabilityMap.get(pid);
+        if (paSet == null) {
+            paSet = new HashSet<String>();
+            partitionAvailabilityMap.put(pid, paSet);
+        }
+        paSet.add(nodeId);
 
-        Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
-        String requestor = partitionRequestorMap.remove(pid);
-        if (requestor != null) {
-            NodeControllerState ncs = ccs.getNodeMap().get(requestor);
-            if (ncs != null) {
-                try {
-                    INodeController nc = ncs.getNodeController();
-                    nc.reportPartitionAvailability(pid, networkAddress);
-                } catch (Exception e) {
-                    e.printStackTrace();
+        NodeControllerState availNcs = ccs.getNodeMap().get(nodeId);
+        Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
+        Set<String> prSet = partitionRequestorMap.get(pid);
+        if (prSet != null) {
+            for (String requestor : prSet) {
+                NodeControllerState ncs = ccs.getNodeMap().get(requestor);
+                if (ncs != null) {
+                    try {
+                        INodeController nc = ncs.getNodeController();
+                        nc.reportPartitionAvailability(pid, availNcs.getDataPort());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
                 }
             }
         }
@@ -62,6 +70,6 @@
 
     @Override
     public String toString() {
-        return "PartitionAvailable@" + networkAddress + "[" + pid + "]";
+        return "PartitionAvailable@" + nodeId + "[" + pid + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index 0055946..e578f77 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -15,7 +15,9 @@
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -47,18 +49,32 @@
                     return;
                 }
 
-                Map<PartitionId, NetworkAddress> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-                NetworkAddress networkAddress = partitionAvailabilityMap.get(pid);
-                if (networkAddress != null) {
-                    try {
-                        INodeController nc = ncs.getNodeController();
-                        nc.reportPartitionAvailability(pid, networkAddress);
-                    } catch (Exception e) {
-                        e.printStackTrace();
+                Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+                Set<String> paSet = partitionAvailabilityMap.get(pid);
+                boolean matched = false;
+                if (paSet != null && !paSet.isEmpty()) {
+                    for (String availablePartitionLocation : paSet) {
+                        NodeControllerState availNcs = ccs.getNodeMap().get(availablePartitionLocation);
+                        if (availNcs != null) {
+                            NetworkAddress networkAddress = availNcs.getDataPort();
+                            try {
+                                INodeController nc = ncs.getNodeController();
+                                nc.reportPartitionAvailability(pid, networkAddress);
+                                matched = true;
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        }
                     }
-                } else {
-                    Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
-                    partitionRequestorMap.put(pid, nodeId);
+                }
+                if (!matched) {
+                    Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
+                    Set<String> prSet = partitionRequestorMap.get(pid);
+                    if (prSet == null) {
+                        prSet = new HashSet<String>();
+                        partitionRequestorMap.put(pid, prSet);
+                    }
+                    prSet.add(nodeId);
                 }
             }
         }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 1d95390..9791161 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -19,18 +19,20 @@
 import java.util.List;
 import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
 public interface IClusterController extends Remote {
-    public NodeParameters registerNode(INodeController nodeController) throws Exception;
+    public NodeParameters registerNode(NodeRegistration reg) throws Exception;
 
     public void unregisterNode(INodeController nodeController) throws Exception;
 
-    public void notifyTaskComplete(UUID jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) throws Exception;
+    public void notifyTaskComplete(UUID jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+            throws Exception;
 
     public void notifyTaskFailure(UUID jobId, TaskAttemptId taskId, String nodeId, Exception e) throws Exception;
 
@@ -38,7 +40,7 @@
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
-    public void registerPartitionProvider(PartitionId pid, NetworkAddress address) throws Exception;
+    public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception;
 
     public void registerPartitionRequest(Collection<PartitionId> requiredPartitionIds, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index eade061..ca2fad9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -24,6 +24,8 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeCapability;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController extends Remote {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/CCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
similarity index 96%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/CCConfig.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index 0ae1d71..d12d037 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/CCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.base;
+package edu.uci.ics.hyracks.control.common.controllers;
 
 import org.kohsuke.args4j.Option;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
similarity index 97%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NCConfig.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 54139c5..c887223 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.base;
+package edu.uci.ics.hyracks.control.common.controllers;
 
 import java.io.Serializable;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeCapability.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeCapability.java
similarity index 94%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeCapability.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeCapability.java
index 74893c6..7ad5fa1 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeCapability.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeCapability.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.base;
+package edu.uci.ics.hyracks.control.common.controllers;
 
 import java.io.Serializable;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeParameters.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
similarity index 96%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeParameters.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
index 786e719..0161f96 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/NodeParameters.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.common.base;
+package edu.uci.ics.hyracks.control.common.controllers;
 
 import java.io.Serializable;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
new file mode 100644
index 0000000..374cf48
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+
+public final class NodeRegistration implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final INodeController nc;
+
+    private final String nodeId;
+
+    private final NCConfig ncConfig;
+
+    private final NetworkAddress dataPort;
+
+    public NodeRegistration(INodeController nc, String nodeId, NCConfig ncConfig, NetworkAddress dataPort) {
+        this.nc = nc;
+        this.nodeId = nodeId;
+        this.ncConfig = ncConfig;
+        this.dataPort = dataPort;
+    }
+
+    public INodeController getNodeController() {
+        return nc;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
+    public NetworkAddress getDataPort() {
+        return dataPort;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
index f413168..dde7abc 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -18,7 +18,7 @@
 
 import edu.uci.ics.dcache.client.DCacheClient;
 import edu.uci.ics.dcache.client.DCacheClientConfig;
-import edu.uci.ics.hyracks.control.common.base.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 
 public class NCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 7a2d9af..a26988f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -70,10 +70,11 @@
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.base.NCConfig;
-import edu.uci.ics.hyracks.control.common.base.NodeCapability;
-import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeCapability;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -129,7 +130,7 @@
         }
         nodeCapability = computeNodeCapability();
         connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
-        partitionManager = new PartitionManager(this, connectionManager.getNetworkAddress());
+        partitionManager = new PartitionManager(this);
         connectionManager.setPartitionRequestListener(partitionManager);
 
         jobletMap = new Hashtable<UUID, Joblet>();
@@ -159,7 +160,8 @@
         connectionManager.start();
         Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
         IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
-        this.nodeParameters = cc.registerNode(this);
+        this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
+                .getNetworkAddress()));
 
         // Schedule heartbeat generator.
         timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 649ec46..e5e23bb 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -22,7 +22,6 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -34,8 +33,6 @@
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class PartitionManager implements IPartitionRequestListener {
-    private final NetworkAddress dataPort;
-
     private final NodeControllerService ncs;
 
     private final Map<PartitionId, List<IPartition>> partitionMap;
@@ -44,8 +41,7 @@
 
     private final IWorkspaceFileFactory fileFactory;
 
-    public PartitionManager(NodeControllerService ncs, NetworkAddress dataPort) {
-        this.dataPort = dataPort;
+    public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
         partitionMap = new HashMap<PartitionId, List<IPartition>>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -62,7 +58,7 @@
             pList.add(partition);
         }
         try {
-            ncs.getClusterController().registerPartitionProvider(pid, dataPort);
+            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 39ca448..40b40db 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -25,8 +25,8 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.base.CCConfig;
-import edu.uci.ics.hyracks.control.common.base.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 
 public abstract class AbstractIntegrationTest {