Implement a basic memory manager to cache results from jobs and use the writer and reader to store and distribute the results.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3067 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java
new file mode 100644
index 0000000..148a8a2
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.partitions;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public final class ResultSetPartitionId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+
+    private final ResultSetId resultSetId;
+    
+    private final int partition;
+
+    public ResultSetPartitionId(JobId jobId, ResultSetId resultSetId, int partition) {
+        this.jobId = jobId;
+        this.resultSetId = resultSetId;
+        this.partition = partition;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public ResultSetId getResultSetId() {
+        return resultSetId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((resultSetId == null) ? 0 : resultSetId.hashCode());
+        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+        result = prime * result + partition;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ResultSetPartitionId other = (ResultSetPartitionId) obj;
+        if (resultSetId == null) {
+            if (other.resultSetId != null)
+                return false;
+        } else if (!resultSetId.equals(other.resultSetId))
+            return false;
+        if (jobId == null) {
+            if (other.jobId != null)
+                return false;
+        } else if (!jobId.equals(other.jobId))
+            return false;
+        if (partition != other.partition)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return jobId.toString() + ":" + resultSetId + ":" + partition;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
new file mode 100644
index 0000000..cecd677
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class DatasetMemoryManager {
+    private final Set<Page> availPages;
+
+    private final LeastRecentlyUsedList leastRecentlyUsedList;
+
+    private final Map<ResultSetPartitionId, PartitionNode> resultPartitionNodesMap;
+
+    private final static int FRAME_SIZE = 32768;
+
+    public DatasetMemoryManager(int availableMemory) {
+        availPages = new HashSet<Page>();
+
+        // Atleast have one page for temporarily storing the results.
+        if (availableMemory <= 0)
+            availableMemory = FRAME_SIZE;
+
+        while (availableMemory >= FRAME_SIZE) {
+            /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+             * instead of direct ByteBuffer.allocate()?
+             */
+            availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+            availableMemory -= FRAME_SIZE;
+        }
+
+        leastRecentlyUsedList = new LeastRecentlyUsedList();
+        resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
+    }
+
+    public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
+            throws OutOfMemoryError, HyracksDataException {
+        Page page;
+        if (availPages.isEmpty()) {
+            page = evictPage();
+        } else {
+            page = getAvailablePage();
+        }
+
+        page.clear();
+
+        /*
+         * It is extremely important to update the reference after obtaining the page because, in the cases where memory
+         * manager is allocated only one page of memory, the front of the LRU list should not be created by the
+         * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
+         * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
+         */
+        PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+        pn.add(page);
+        return page;
+    }
+
+    public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
+        // When a page is referenced the dataset partition writer should already be known, so we pass null.
+        updateReference(resultSetPartitionId, null);
+    }
+
+    public int getPageSize() {
+        return FRAME_SIZE;
+    }
+
+    protected void insertPartitionNode(ResultSetPartitionId resultSetPartitionId, PartitionNode pn) {
+        leastRecentlyUsedList.add(pn);
+        resultPartitionNodesMap.put(resultSetPartitionId, pn);
+    }
+
+    protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
+            IDatasetPartitionWriter dpw) {
+        PartitionNode pn = null;
+
+        if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
+            if (dpw != null) {
+                pn = new PartitionNode(resultSetPartitionId, dpw);
+                insertPartitionNode(resultSetPartitionId, pn);
+            }
+            return pn;
+        }
+        pn = resultPartitionNodesMap.get(resultSetPartitionId);
+        leastRecentlyUsedList.remove(pn);
+        insertPartitionNode(resultSetPartitionId, pn);
+
+        return pn;
+    }
+
+    protected synchronized Page evictPage() throws HyracksDataException {
+        PartitionNode pn = leastRecentlyUsedList.getFirst();
+        IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
+        Page page = dpw.returnPage();
+
+        /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
+         * away all the pages allocated to it and add to the available pages set.
+         */
+        if (page == null) {
+            availPages.addAll(pn);
+            pn.clear();
+            resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+            leastRecentlyUsedList.remove(pn);
+
+            /* Based on the assumption that if the dataset partition writer returned a null page, it should be lying about
+             * the number of pages it holds in which case we just evict all the pages it holds and should thus be able to
+             * add all those pages to available set and we have at least one page to allocate back.
+             */
+            page = getAvailablePage();
+        } else {
+            pn.remove(page);
+
+            // If the partition no more holds any pages, remove it from the linked list and the hash map.
+            if (pn.isEmpty()) {
+                resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+                leastRecentlyUsedList.remove(pn);
+            }
+        }
+
+        return page;
+    }
+
+    protected synchronized Page getAvailablePage() {
+        Iterator<Page> iter = availPages.iterator();
+        Page page = iter.next();
+        iter.remove();
+        return page;
+    }
+
+    private class LeastRecentlyUsedList {
+        private PartitionNode head;
+
+        private PartitionNode tail;
+
+        public LeastRecentlyUsedList() {
+            head = null;
+            tail = null;
+        }
+
+        public void add(PartitionNode node) {
+            if (head == null) {
+                head = tail = node;
+                return;
+            }
+            tail.setNext(node);
+            node.setPrev(tail);
+            tail = node;
+        }
+
+        public void remove(PartitionNode node) {
+            if ((node == head) && (node == tail)) {
+                head = tail = null;
+                return;
+            } else if (node == head) {
+                head = head.getNext();
+                head.setPrev(null);
+                return;
+            } else if (node == tail) {
+                tail = tail.getPrev();
+                tail.setNext(null);
+                return;
+            } else {
+                PartitionNode prev = node.getPrev();
+                PartitionNode next = node.getNext();
+                prev.setNext(next);
+                next.setPrev(prev);
+            }
+        }
+
+        public PartitionNode getFirst() {
+            return head;
+        }
+    }
+
+    private class PartitionNode extends HashSet<Page> {
+        private static final long serialVersionUID = 1L;
+
+        private final ResultSetPartitionId resultSetPartitionId;
+
+        private final IDatasetPartitionWriter datasetPartitionWriter;
+
+        private PartitionNode prev;
+
+        private PartitionNode next;
+
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+            this.resultSetPartitionId = resultSetPartitionId;
+            this.datasetPartitionWriter = datasetPartitionWriter;
+            prev = null;
+            next = null;
+        }
+
+        public ResultSetPartitionId getResultSetPartitionId() {
+            return resultSetPartitionId;
+        }
+
+        public IDatasetPartitionWriter getDatasetPartitionWriter() {
+            return datasetPartitionWriter;
+        }
+
+        public void setPrev(PartitionNode node) {
+            prev = node;
+        }
+
+        public PartitionNode getPrev() {
+            return prev;
+        }
+
+        public void setNext(PartitionNode node) {
+            next = node;
+        }
+
+        public PartitionNode getNext() {
+            return next;
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index d9934e4..1cad54b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -35,18 +36,21 @@
 
     private final Executor executor;
 
-    private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
+    private final Map<JobId, ResultState[]> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
     private final IWorkspaceFileFactory fileFactory;
 
+    private final DatasetMemoryManager datasetMemoryManager;
+
     public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
         this.ncs = ncs;
         this.executor = executor;
-        partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
+        partitionResultStateMap = new HashMap<JobId, ResultState[]>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+        datasetMemoryManager = new DatasetMemoryManager(availableMemory);
     }
 
     @Override
@@ -57,14 +61,14 @@
         try {
             ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
                     nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
+            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
 
-            DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
-            if (writers == null) {
-                writers = new DatasetPartitionWriter[nPartitions];
-                partitionDatasetWriterMap.put(jobId, writers);
+            ResultState[] resultStates = partitionResultStateMap.get(jobId);
+            if (resultStates == null) {
+                resultStates = new ResultState[nPartitions];
+                partitionResultStateMap.put(jobId, resultStates);
             }
-            writers[partition] = dpw;
+            resultStates[partition] = dpw.getResultState();
         } catch (Exception e) {
             throw new HyracksException(e);
         }
@@ -93,17 +97,19 @@
     @Override
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
             throws HyracksException {
-        DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
-        if (writers == null) {
+        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+        if (resultStates == null) {
             throw new HyracksException("Unknown JobId " + jobId);
         }
 
-        DatasetPartitionWriter dpw = writers[partition];
-        if (dpw == null) {
+        ResultState resultState = resultStates[partition];
+        if (resultState == null) {
             throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
         }
 
-        dpw.writeTo(writer);
+        IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        dpr.writeTo(writer);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
new file mode 100644
index 0000000..7b2b30a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+
+public class DatasetPartitionReader implements IDatasetPartitionReader {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
+
+    private final DatasetMemoryManager datasetMemoryManager;
+
+    private final Executor executor;
+
+    private final ResultState resultState;
+
+    private IFileHandle fileHandle;
+
+    public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+        this.datasetMemoryManager = datasetMemoryManager;
+        this.executor = executor;
+        this.resultState = resultState;
+    }
+
+    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+        long readSize = 0;
+        synchronized (resultState) {
+            while (offset >= resultState.getSize() && !resultState.getEOS()) {
+                try {
+                    resultState.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+
+        if (offset >= resultState.getSize() && resultState.getEOS()) {
+            return readSize;
+        }
+
+        if (offset < resultState.getPersistentSize()) {
+            readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
+        }
+
+        if (readSize < buffer.capacity()) {
+            long localPageOffset = offset - resultState.getPersistentSize();
+            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+            Page page = resultState.getPage(localPageIndex);
+            readSize += buffer.remaining();
+            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+        }
+
+        datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
+        return readSize;
+    }
+
+    @Override
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                NetworkOutputChannel channel = (NetworkOutputChannel) writer;
+                channel.setFrameSize(resultState.getFrameSize());
+                try {
+                    fileHandle = resultState.getIOManager().open(resultState.getFileReference(),
+                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    channel.open();
+                    try {
+                        long offset = 0;
+                        ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
+                        while (true) {
+                            buffer.clear();
+                            long size = read(offset, buffer);
+                            if (size <= 0) {
+                                break;
+                            } else if (size < buffer.limit()) {
+                                throw new HyracksDataException("Premature end of file - readSize: " + size
+                                        + " buffer limit: " + buffer.limit());
+                            }
+                            offset += size;
+                            buffer.flip();
+                            channel.nextFrame(buffer);
+                        }
+                    } finally {
+                        channel.close();
+                        resultState.getIOManager().close(fileHandle);
+                    }
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
+                }
+            }
+        });
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 0f5895e..f6ae540 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -15,14 +15,13 @@
 package edu.uci.ics.hyracks.control.nc.dataset;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -30,16 +29,13 @@
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
-public class DatasetPartitionWriter implements IFrameWriter, IPartition {
+public class DatasetPartitionWriter implements IDatasetPartitionWriter {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
 
     private static final String FILE_PREFIX = "result_";
 
-    private final IHyracksTaskContext ctx;
-
     private final IDatasetPartitionManager manager;
 
     private final JobId jobId;
@@ -48,25 +44,28 @@
 
     private final int partition;
 
-    private final Executor executor;
+    private final DatasetMemoryManager datasetMemoryManager;
 
-    private final AtomicBoolean eos;
+    private final ResultSetPartitionId resultSetPartitionId;
 
-    private FileReference fRef;
+    private final ResultState resultState;
 
-    private IFileHandle handle;
-
-    private long size;
+    private IFileHandle fileHandle;
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, Executor executor) {
-        this.ctx = ctx;
+            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
         this.partition = partition;
-        this.executor = executor;
-        eos = new AtomicBoolean(false);
+        this.datasetMemoryManager = datasetMemoryManager;
+
+        resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
+        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+    }
+
+    public ResultState getResultState() {
+        return resultState;
     }
 
     @Override
@@ -74,16 +73,32 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + partition + ")");
         }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
-        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+        String fName = FILE_PREFIX + String.valueOf(partition);
+        FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
+        fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        size = 0;
+        resultState.init(fRef);
     }
 
     @Override
-    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        size += ctx.getIOManager().syncWrite(handle, size, buffer);
-        notifyAll();
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = resultState.getLastPage();
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                resultState.addPage(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            resultState.incrementSize(srcLength);
+        }
+
+        synchronized (resultState) {
+            resultState.notifyAll();
+        }
     }
 
     @Override
@@ -96,14 +111,16 @@
     }
 
     @Override
-    public synchronized void close() throws HyracksDataException {
+    public void close() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
 
         try {
-            eos.set(true);
-            notifyAll();
+            synchronized (resultState) {
+                resultState.setEOS(true);
+                resultState.notifyAll();
+            }
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
@@ -111,63 +128,21 @@
     }
 
     @Override
-    public IHyracksTaskContext getTaskContext() {
-        return ctx;
-    }
+    public Page returnPage() throws HyracksDataException {
+        Page page = resultState.removePage(0);
 
-    private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-        while (offset >= size && !eos.get()) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
+        IIOManager ioManager = resultState.getIOManager();
+
+        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+        if (page == null) {
+            ioManager.close(fileHandle);
+            return null;
         }
-        return ctx.getIOManager().syncRead(handle, offset, buffer);
-    }
 
-    @Override
-    public void writeTo(final IFrameWriter writer) {
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                NetworkOutputChannel channel = (NetworkOutputChannel) writer;
-                channel.setTaskContext(ctx);
-                try {
-                    channel.open();
-                    try {
-                        long offset = 0;
-                        ByteBuffer buffer = ctx.allocateFrame();
-                        while (true) {
-                            buffer.clear();
-                            long size = read(offset, buffer);
-                            if (size < 0) {
-                                break;
-                            } else if (size < buffer.capacity()) {
-                                throw new HyracksDataException("Premature end of file");
-                            }
-                            offset += size;
-                            buffer.flip();
-                            channel.nextFrame(buffer);
-                        }
-                    } finally {
-                        channel.close();
-                        ctx.getIOManager().close(handle);
-                    }
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-    }
+        page.getBuffer().flip();
 
-    @Override
-    public boolean isReusable() {
-        return true;
-    }
-
-    @Override
-    public void deallocate() {
-
+        long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
+        resultState.incrementPersistentSize(delta);
+        return page;
     }
 }