diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 36e348b..abadc7e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -17,7 +17,8 @@
 import java.io.Serializable;
 import java.util.EnumSet;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -182,11 +183,15 @@
         private static final long serialVersionUID = 1L;
 
         private final JobId jobId;
-        private final NetworkAddress[] knownLocations;
 
-        public GetDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) {
+        private final ResultSetId rsId;
+
+        private final DatasetDirectoryRecord[] knownRecords;
+
+        public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
             this.jobId = jobId;
-            this.knownLocations = knownLocations;
+            this.rsId = rsId;
+            this.knownRecords = knownRecords;
         }
 
         @Override
@@ -198,8 +203,12 @@
             return jobId;
         }
 
-        public NetworkAddress[] getKnownLocations() {
-            return knownLocations;
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
+        public DatasetDirectoryRecord[] getKnownRecords() {
+            return knownRecords;
         }
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 62dc6c1..c7aabb0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -22,6 +22,6 @@
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress);
 
-    public NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
-            throws HyracksDataException;
+    public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index 715581c..29fabe2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceConnection {
@@ -28,6 +27,6 @@
      * @return {@link NetworkAddress[]}
      * @throws Exception
      */
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception;
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index b4b3a34..d1db448 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceInterface {
@@ -23,11 +22,11 @@
      * 
      * @param jobId
      *            ID of the job
-     * @param knownLocations
-     *            Locations that are already known to the client
+     * @param knownRecords
+     *            Locations from the dataset directory that are already known to the client
      * @return {@link NetworkAddress[]}
      * @throws Exception
      */
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception;
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception;
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index a102d18..f2bba2a 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -21,19 +21,24 @@
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
 import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -43,20 +48,32 @@
 // TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
 public class HyracksDataset implements IHyracksDataset {
     private final JobId jobId;
+
+    private final List<ResultSetId> rsIds;
+
     private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
+
     private final ClientNetworkManager netManager;
-    private NetworkAddress[] knownLocations;
+
+    private DatasetDirectoryRecord[] knownRecords;
 
     private IDatasetInputChannelMonitor[] monitors;
 
     // TODO:we should probably allow clients to specify this. 32K is the size for now.
     private static int FRAME_SIZE = 32 * 1024;
 
-    public HyracksDataset(JobId jobId, IHyracksDatasetDirectoryServiceConnection ddsc, int nReaders) throws Exception {
+    public HyracksDataset(IHyracksClientConnection hcc, JobSpecification jobSpec, JobId jobId, int nReaders)
+            throws Exception {
         this.jobId = jobId;
-        this.datasetDirectoryServiceConnection = ddsc;
+        this.rsIds = jobSpec.getResultSetIds();
+
+        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo(jobId);
+        datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
+                ddsAddress.getIpAddress()), ddsAddress.getPort());
+
         netManager = new ClientNetworkManager(nReaders);
-        knownLocations = null;
+
+        knownRecords = null;
         monitors = null;
     }
 
@@ -64,7 +81,7 @@
         netManager.start();
     }
 
-    private boolean nullExists(NetworkAddress[] locations) {
+    private boolean nullExists(DatasetDirectoryRecord[] locations) {
         if (locations == null) {
             return true;
         }
@@ -77,11 +94,11 @@
     }
 
     private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
-        if (knownLocations == null || knownLocations[partition] == null) {
+        if (knownRecords == null || knownRecords[partition] == null) {
             throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
         }
         if (monitors == null) {
-            monitors = new DatasetInputChannelMonitor[knownLocations.length];
+            monitors = new DatasetInputChannelMonitor[knownRecords.length];
         }
         if (monitors[partition] == null) {
             monitors[partition] = new DatasetInputChannelMonitor();
@@ -176,11 +193,11 @@
     private void readResults() throws HyracksDataException {
         ByteBuffer buffer = null;
 
-        if (knownLocations == null) {
+        if (knownRecords == null) {
             return;
         }
-        for (int i = 0; i < knownLocations.length; i++) {
-            final NetworkAddress addr = knownLocations[i];
+        for (int i = 0; i < knownRecords.length; i++) {
+            final DatasetDirectoryRecord addr = knownRecords[i];
             if (addr != null) {
                 try {
                     DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
@@ -220,24 +237,26 @@
     @Override
     public ByteBuffer getResults() {
         try {
+            ResultSetId rsId = rsIds.get(0);
             start();
-            while (nullExists(knownLocations)) {
+            while (nullExists(knownRecords)) {
                 try {
-                    knownLocations = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
-                            knownLocations);
+                    knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId, rsId,
+                            knownRecords);
                     readResults();
                 } catch (Exception e) {
                     // TODO(madhusudancs) Do something here
                 }
             }
         } catch (IOException e) {
-            // Do something here
+            // TODO(madhusudancs): Do something here
         }
 
         return null;
     }
 
-    private SocketAddress getSocketAddress(NetworkAddress addr) throws UnknownHostException {
-        return new InetSocketAddress(InetAddress.getByAddress(addr.getIpAddress()), addr.getPort());
+    private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
+        NetworkAddress netAddr = addr.getNetworkAddress();
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
     }
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 3907709..1399dcc 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -16,9 +16,10 @@
 
 import java.net.InetSocketAddress;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -39,7 +40,8 @@
     }
 
     @Override
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) throws Exception {
-        return ddsi.getDatasetResultLocationsFunction(jobId, knownLocations);
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception {
+        return ddsi.getDatasetResultLocationsFunction(jobId, rsId, knownRecords);
     }
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 167fab8..fb6e41f 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -15,8 +15,9 @@
 package edu.uci.ics.hyracks.client.dataset;
 
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -33,10 +34,10 @@
     }
 
     @Override
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception {
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
-                jobId, knownLocations);
-        return (NetworkAddress[]) rpci.call(ipcHandle, gdrlf);
+                jobId, rsId, knownRecords);
+        return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 36ee412..95eecab 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -346,10 +347,10 @@
                 }
 
                 case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
                     workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
-                            .getJobId(), gdrlf.getKnownLocations(), new IPCResponder<NetworkAddress[]>(handle, mid)));
+                            .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+                            new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 5d49bbf..ba1f750 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -60,15 +60,24 @@
     }
 
     @Override
-    public synchronized NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
-            throws HyracksDataException {
-        while (Arrays.equals(jobPartitionLocationsMap.get(jobId), knownLocations)) {
+    public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
+        while (!newRecords(jobId, rsId, knownRecords)) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw new HyracksDataException(e);
             }
         }
-        return jobPartitionLocationsMap.get(jobId);
+        return jobResultLocationsMap.get(jobId).get(rsId);
+    }
+
+    private boolean newRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
+        Map<ResultSetId, DatasetDirectoryRecord[]> rsMap = jobResultLocationsMap.get(jobId);
+        if (rsMap == null) {
+            return false;
+        }
+        DatasetDirectoryRecord[] records = rsMap.get(rsId);
+        return !Arrays.equals(records, knownRecords);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index 814feb0..fd1d418 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -14,8 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -24,15 +25,21 @@
 
 public class GetResultPartitionLocationsWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private final NetworkAddress[] knownLocations;
-    private final IResultCallback<NetworkAddress[]> callback;
 
-    public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, NetworkAddress[] knownLocations,
-            IResultCallback<NetworkAddress[]> callback) {
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final DatasetDirectoryRecord[] knownRecords;
+
+    private final IResultCallback<DatasetDirectoryRecord[]> callback;
+
+    public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
         this.ccs = ccs;
         this.jobId = jobId;
-        this.knownLocations = knownLocations;
+        this.rsId = rsId;
+        this.knownRecords = knownRecords;
         this.callback = callback;
     }
 
@@ -43,7 +50,8 @@
             @Override
             public void run() {
                 try {
-                    NetworkAddress[] partitionLocations = dds.getResultPartitionLocations(jobId, knownLocations);
+                    DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
+                            knownRecords);
                     callback.setValue(partitionLocations);
                 } catch (HyracksDataException e) {
                     callback.setException(e);
