Read results for each result set id.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 0c5a5bb..dea077d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -29,7 +29,8 @@
 
     public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
 
-    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+            throws HyracksException;
 
     public void abortReader(JobId jobId);
 
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 08ad0d7..33e1b01 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -101,7 +101,7 @@
                             knownRecords);
                     lastReadPartition = 0;
                     resultChannel = new DatasetNetworkInputChannel(netManager,
-                            getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+                            getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
                             NUM_READ_BUFFERS);
                     lastMonitor = getMonitor(lastReadPartition);
                     resultChannel.open(datasetClientCtx);
@@ -143,8 +143,8 @@
                         }
 
                         resultChannel = new DatasetNetworkInputChannel(netManager,
-                                getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
-                                NUM_READ_BUFFERS);
+                                getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
+                                lastReadPartition, NUM_READ_BUFFERS);
                         lastMonitor = getMonitor(lastReadPartition);
                         resultChannel.open(datasetClientCtx);
                         resultChannel.registerMonitor(lastMonitor);
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fac2949..1ab315b 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -41,6 +42,8 @@
 
     private final JobId jobId;
 
+    private final ResultSetId resultSetId;
+
     private final int partition;
 
     private final Queue<ByteBuffer> fullQueue;
@@ -54,10 +57,11 @@
     private Object attachment;
 
     public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
-            int partition, int nBuffers) {
+            ResultSetId resultSetId, int partition, int nBuffers) {
         this.netManager = netManager;
         this.remoteAddress = remoteAddress;
         this.jobId = jobId;
+        this.resultSetId = resultSetId;
         this.partition = partition;
         fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         this.nBuffers = nBuffers;
@@ -103,6 +107,7 @@
         }
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(jobId.getId());
+        writeBuffer.putLong(resultSetId.getId());
         writeBuffer.putInt(partition);
         writeBuffer.flip();
         if (LOGGER.isLoggable(Level.FINE)) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index ef2902e..2ec7acc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -38,7 +39,7 @@
 
     private final Executor executor;
 
-    private final Map<JobId, ResultState[]> partitionResultStateMap;
+    private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -57,18 +58,23 @@
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+        partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
             private static final long serialVersionUID = 1L;
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
+            protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
                 synchronized (DatasetPartitionManager.this) {
                     if (size() > resultHistorySize) {
-                        ResultState[] resultStates = eldest.getValue();
-                        for (int i = 0; i < resultStates.length; i++) {
-                            ResultState state = resultStates[i];
-                            if (state != null) {
-                                state.closeAndDelete();
-                                LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+                        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
+                        for (ResultSetId rsId : rsIdMap.keySet()) {
+                            ResultState[] resultStates = rsIdMap.get(rsId);
+                            if (resultStates != null) {
+                                for (int i = 0; i < resultStates.length; i++) {
+                                    ResultState state = resultStates[i];
+                                    if (state != null) {
+                                        state.closeAndDelete();
+                                        LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+                                    }
+                                }
                             }
                         }
                         return true;
@@ -90,10 +96,16 @@
                         nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
                 dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
 
-                ResultState[] resultStates = partitionResultStateMap.get(jobId);
+                Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+                if (rsIdMap == null) {
+                    rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+                    partitionResultStateMap.put(jobId, rsIdMap);
+                }
+
+                ResultState[] resultStates = rsIdMap.get(rsId);
                 if (resultStates == null) {
                     resultStates = new ResultState[nPartitions];
-                    partitionResultStateMap.put(jobId, resultStates);
+                    rsIdMap.put(rsId, resultStates);
                 }
                 resultStates[partition] = dpw.getResultState();
             }
@@ -128,16 +140,21 @@
     }
 
     @Override
-    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
-            throws HyracksException {
+    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
+            IFrameWriter writer) throws HyracksException {
         ResultState resultState;
         synchronized (this) {
-            ResultState[] resultStates = partitionResultStateMap.get(jobId);
+            Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
 
-            if (resultStates == null) {
+            if (rsIdMap == null) {
                 throw new HyracksException("Unknown JobId " + jobId);
             }
 
+            ResultState[] resultStates = rsIdMap.get(resultSetId);
+            if (resultStates == null) {
+                throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
+            }
+
             resultState = resultStates[partition];
             if (resultState == null) {
                 throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
@@ -146,19 +163,26 @@
 
         DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
-        LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
+        LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+                + partition);
     }
 
     @Override
     public synchronized void abortReader(JobId jobId) {
-        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
 
-        if (resultStates == null) {
+        if (rsIdMap == null) {
             return;
         }
-        for (ResultState state : resultStates) {
-            if (state != null) {
-                state.abort();
+
+        for (ResultSetId rsId : rsIdMap.keySet()) {
+            ResultState[] resultStates = rsIdMap.get(rsId);
+            if (resultStates != null) {
+                for (ResultState state : resultStates) {
+                    if (state != null) {
+                        state.abort();
+                    }
+                }
             }
         }
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 5b8b333..84baf49 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -24,6 +24,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
@@ -95,6 +96,7 @@
         @Override
         public void accept(ByteBuffer buffer) {
             JobId jobId = new JobId(buffer.getLong());
+            ResultSetId rsId = new ResultSetId(buffer.getLong());
             int partition = buffer.getInt();
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
@@ -102,7 +104,7 @@
             }
             noc = new NetworkOutputChannel(ccb, 1);
             try {
-                partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+                partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
             } catch (HyracksException e) {
                 noc.abort();
             }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index e74e54c..715f822 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -60,7 +60,7 @@
     private static IHyracksClientConnection hcc;
 
     private final List<File> outputFiles;
-    
+
     protected static int DEFAULT_MEM_PAGE_SIZE = 32768;
     protected static int DEFAULT_MEM_NUM_PAGES = 1000;