diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 3ff2c46..62dc6c1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetDirectoryService {
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress);
 
     public NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 862d9aa..6c1ff10 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,8 +21,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager {
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
-            throws HyracksException;
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
+            int nPartitions) throws HyracksException;
 
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 94f7015..36ee412 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -451,7 +451,8 @@
                 case REGISTER_RESULT_PARTITION_LOCATION: {
                     CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+                            .getJobId(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf
+                            .getNetworkAddress()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e42b178..5d49bbf 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -30,22 +31,31 @@
  * all the results) completely. Then we can just get rid of the location information for that job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, NetworkAddress[]> jobPartitionLocationsMap;
+    private final Map<JobId, Map<ResultSetId, DatasetDirectoryRecord[]>> jobResultLocationsMap;
 
     public DatasetDirectoryService() {
-        jobPartitionLocationsMap = new HashMap<JobId, NetworkAddress[]>();
+        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, DatasetDirectoryRecord[]>>();
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
-            NetworkAddress networkAddress) {
-        NetworkAddress[] partitionLocations = jobPartitionLocationsMap.get(jobId);
-        if (partitionLocations == null) {
-            partitionLocations = new NetworkAddress[nPartitions];
-            jobPartitionLocationsMap.put(jobId, partitionLocations);
+    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition,
+            int nPartitions, NetworkAddress networkAddress) {
+        Map<ResultSetId, DatasetDirectoryRecord[]> resultSetsMap = jobResultLocationsMap.get(jobId);
+        if (resultSetsMap == null) {
+            resultSetsMap = new HashMap<ResultSetId, DatasetDirectoryRecord[]>();
+            jobResultLocationsMap.put(jobId, resultSetsMap);
         }
 
-        partitionLocations[partition] = networkAddress;
+        DatasetDirectoryRecord[] records = resultSetsMap.get(rsId);
+        if (records == null) {
+            records = new DatasetDirectoryRecord[nPartitions];
+            resultSetsMap.put(rsId, records);
+        }
+
+        if (records[partition] == null) {
+            records[partition] = new DatasetDirectoryRecord();
+        }
+        records[partition].setNetworkAddress(networkAddress);
         notifyAll();
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index e18572c..3c762eb 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -15,21 +15,29 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class RegisterResultPartitionLocationWork extends AbstractWork {
     private final ClusterControllerService ccs;
+
     private final JobId jobId;
+
+    private final ResultSetId rsId;
+
     private final int partition;
+
     private final int nPartitions;
+
     private final NetworkAddress networkAddress;
 
-    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, int partition,
-            int nPartitions, NetworkAddress networkAddress) {
+    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
+        this.rsId = rsId;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.networkAddress = networkAddress;
@@ -37,12 +45,13 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, partition, nPartitions, networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
+                networkAddress);
     }
 
     @Override
     public String toString() {
-        return "JobId@" + jobId + "Partition@" + partition + "NPartitions@" + nPartitions + "ResultPartitionLocation@"
-                + networkAddress;
+        return "JobId@" + jobId + " Partition@" + partition + " ResultSetId@" + rsId + " NPartitions@" + nPartitions
+                + " ResultPartitionLocation@" + networkAddress;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index b0b81f4..086fc4d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -47,8 +48,8 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions, NetworkAddress addr)
-            throws Exception;
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+            NetworkAddress addr) throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 56e9830..e3591ce 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -444,14 +445,18 @@
 
         private final JobId jobId;
 
+        private final ResultSetId rsId;
+
         private final int partition;
 
         private final int nPartitions;
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, int partition, int nPartitions, NetworkAddress networkAddress) {
+        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+                NetworkAddress networkAddress) {
             this.jobId = jobId;
+            this.rsId = rsId;
             this.partition = partition;
             this.nPartitions = nPartitions;
             this.networkAddress = networkAddress;
@@ -466,6 +471,10 @@
             return jobId;
         }
 
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
         public int getPartition() {
             return partition;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 76b7043..595efd1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -96,10 +97,10 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress) throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, partition, nPartitions, networkAddress);
+                jobId, rsId, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 9d9d5fc..7d9b10e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -49,12 +50,12 @@
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
-            throws HyracksException {
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
+            int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, partition, nPartitions,
+            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
                     ncs.getDatasetNetworkManager().getNetworkAddress());
             dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 31b9af8..dde8669 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -30,8 +31,11 @@
 public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
+    private final ResultSetId rsId;
+
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId) {
         super(spec, 1, 0);
+        this.rsId = rsId;
     }
 
     @Override
@@ -45,7 +49,7 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, partition, nPartitions);
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, partition, nPartitions);
                     datasetPartitionWriter.open();
                 } catch (HyracksException e) {
                     throw new HyracksDataException(e);
