Lock conservatively for writes and don't lock result state at all for references.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index 4a3c72d..8fef455 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -49,8 +49,8 @@
         resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
     }
 
-    public Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
-            throws OutOfMemoryError, HyracksDataException {
+    public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+            throws HyracksDataException {
         Page page;
         if (availPages.isEmpty()) {
             if (availableMemory >= FRAME_SIZE) {
@@ -94,8 +94,7 @@
         resultPartitionNodesMap.put(resultSetPartitionId, pn);
     }
 
-    protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
-            ResultState resultState) {
+    protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
         PartitionNode pn = null;
 
         if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
@@ -105,14 +104,16 @@
             }
             return pn;
         }
-        pn = resultPartitionNodesMap.get(resultSetPartitionId);
-        leastRecentlyUsedList.remove(pn);
-        insertPartitionNode(resultSetPartitionId, pn);
+        synchronized (this) {
+            pn = resultPartitionNodesMap.get(resultSetPartitionId);
+            leastRecentlyUsedList.remove(pn);
+            insertPartitionNode(resultSetPartitionId, pn);
+        }
 
         return pn;
     }
 
-    protected synchronized Page evictPage() throws HyracksDataException {
+    protected Page evictPage() throws HyracksDataException {
         PartitionNode pn = leastRecentlyUsedList.getFirst();
         ResultState resultState = pn.getResultState();
         Page page = resultState.returnPage();
@@ -144,7 +145,7 @@
         return page;
     }
 
-    protected synchronized Page getAvailablePage() {
+    protected Page getAvailablePage() {
         Iterator<Page> iter = availPages.iterator();
         Page page = iter.next();
         iter.remove();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 73044a1..09d1bff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -49,12 +49,6 @@
 
     private final List<Page> localPageList;
 
-    private final List<Integer> inMemoryList;
-
-    private Page tailPage;
-
-    private int tailPageLength;
-
     private FileReference fileRef;
 
     private IFileHandle writeFileHandle;
@@ -74,10 +68,7 @@
         eos = new AtomicBoolean(false);
         failed = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
-        inMemoryList = new ArrayList<Integer>();
 
-        tailPage = null;
-        tailPageLength = 0;
         fileRef = null;
         writeFileHandle = null;
     }
@@ -88,16 +79,14 @@
     }
 
     public synchronized void close() {
-        if (tailPage != null) {
-            localPageList.add(tailPage);
-            inMemoryList.add(localPageList.size() - 1);
-            size += tailPageLength;
-        }
         eos.set(true);
         notifyAll();
     }
 
     public synchronized void closeAndDelete() {
+        // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+        // to be taken when there are more requests to these result states.
+        failed.set(true);
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -113,23 +102,21 @@
     public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
             throws HyracksDataException {
         int srcOffset = 0;
+        Page destPage = null;
 
-        if (tailPage == null) {
-            tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+        if (!localPageList.isEmpty()) {
+            destPage = localPageList.get(localPageList.size() - 1);
         }
 
         while (srcOffset < buffer.limit()) {
-            if (tailPage.getBuffer().remaining() <= 0) {
-                localPageList.add(tailPage);
-                inMemoryList.add(localPageList.size() - 1);
-                size += tailPageLength;
-                tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
-                tailPageLength = 0;
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                localPageList.add(destPage);
             }
-            int srcLength = Math.min(buffer.limit() - srcOffset, tailPage.getBuffer().remaining());
-            tailPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
             srcOffset += srcLength;
-            tailPageLength += srcLength;
+            size += srcLength;
         }
 
         notifyAll();
@@ -145,7 +132,7 @@
         }
     }
 
-    public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+    public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
             throws HyracksDataException {
         long readSize = 0;
         synchronized (this) {
@@ -156,31 +143,30 @@
                     throw new HyracksDataException(e);
                 }
             }
-        }
 
-        if ((offset >= size && eos.get()) || failed.get()) {
-            return readSize;
-        }
-
-        if (offset < persistentSize) {
-            if (readFileHandle == null) {
-                initReadFileHandle();
-            }
-            readSize = ioManager.syncRead(readFileHandle, offset, buffer);
-        }
-
-        if (readSize < buffer.capacity()) {
-            long localPageOffset = offset - persistentSize;
-            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
-            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
-            Page page = getPage(localPageIndex);
-            if (page == null) {
+            if ((offset >= size && eos.get()) || failed.get()) {
                 return readSize;
             }
-            readSize += buffer.remaining();
-            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
-        }
 
+            if (offset < persistentSize) {
+                if (readFileHandle == null) {
+                    initReadFileHandle();
+                }
+                readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+            }
+
+            if (readSize < buffer.capacity()) {
+                long localPageOffset = offset - persistentSize;
+                int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+                int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+                Page page = getPage(localPageIndex);
+                if (page == null) {
+                    return readSize;
+                }
+                readSize += buffer.remaining();
+                buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+            }
+        }
         datasetMemoryManager.pageReferenced(resultSetPartitionId);
         return readSize;
     }
@@ -265,9 +251,8 @@
 
     private Page removePage() {
         Page page = null;
-        if (!inMemoryList.isEmpty()) {
-            int index = inMemoryList.get(inMemoryList.size() - 1);
-            page = localPageList.set(index, null);
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(localPageList.size() - 1);
         }
         return page;
     }