Refactor result caching mechanism.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
deleted file mode 100644
index 8f5ed64..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-
-public interface IDatasetPartitionReader {
-    public void writeTo(IFrameWriter writer);
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
-    public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index 2386849..4a3c72d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,7 +21,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
 import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
@@ -50,7 +49,7 @@
         resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
     }
 
-    public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
+    public Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
             throws OutOfMemoryError, HyracksDataException {
         Page page;
         if (availPages.isEmpty()) {
@@ -76,7 +75,7 @@
          * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
          * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
          */
-        PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+        PartitionNode pn = updateReference(resultSetPartitionId, resultState);
         pn.add(page);
         return page;
     }
@@ -96,12 +95,12 @@
     }
 
     protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
-            IDatasetPartitionWriter dpw) {
+            ResultState resultState) {
         PartitionNode pn = null;
 
         if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
-            if (dpw != null) {
-                pn = new PartitionNode(resultSetPartitionId, dpw);
+            if (resultState != null) {
+                pn = new PartitionNode(resultSetPartitionId, resultState);
                 insertPartitionNode(resultSetPartitionId, pn);
             }
             return pn;
@@ -115,8 +114,8 @@
 
     protected synchronized Page evictPage() throws HyracksDataException {
         PartitionNode pn = leastRecentlyUsedList.getFirst();
-        IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
-        Page page = dpw.returnPage();
+        ResultState resultState = pn.getResultState();
+        Page page = resultState.returnPage();
 
         /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
          * away all the pages allocated to it and add to the available pages set.
@@ -202,15 +201,15 @@
 
         private final ResultSetPartitionId resultSetPartitionId;
 
-        private final IDatasetPartitionWriter datasetPartitionWriter;
+        private final ResultState resultState;
 
         private PartitionNode prev;
 
         private PartitionNode next;
 
-        public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
             this.resultSetPartitionId = resultSetPartitionId;
-            this.datasetPartitionWriter = datasetPartitionWriter;
+            this.resultState = resultState;
             prev = null;
             next = null;
         }
@@ -219,8 +218,8 @@
             return resultSetPartitionId;
         }
 
-        public IDatasetPartitionWriter getDatasetPartitionWriter() {
-            return datasetPartitionWriter;
+        public ResultState getResultState() {
+            return resultState;
         }
 
         public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 9b92a22..7b61021 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -64,7 +63,7 @@
                         for (int i = 0; i < resultStates.length; i++) {
                             ResultState state = resultStates[i];
                             if (state != null) {
-                                state.deinit();
+                                state.closeAndDelete();
                                 LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
                             }
                         }
@@ -141,7 +140,7 @@
             }
         }
 
-        IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
         LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index dbe3e4d..f47f5a9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
 
     private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
         this.datasetMemoryManager = datasetMemoryManager;
         this.executor = executor;
         this.resultState = resultState;
     }
 
-    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-        long readSize = 0;
-        synchronized (resultState) {
-            while (offset >= resultState.getSize() && !resultState.getEOS()) {
-                try {
-                    resultState.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-
-        if (offset >= resultState.getSize() && resultState.getEOS()) {
-            return readSize;
-        }
-
-        if (offset < resultState.getPersistentSize()) {
-            readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
-        }
-
-        if (readSize < buffer.capacity()) {
-            long localPageOffset = offset - resultState.getPersistentSize();
-            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
-            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
-            Page page = resultState.getPage(localPageIndex);
-            if (page == null) {
-                return readSize;
-            }
-            readSize += buffer.remaining();
-            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
-        }
-
-        datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
-        return readSize;
-    }
-
-    @Override
     public void writeTo(final IFrameWriter writer) {
         executor.execute(new Runnable() {
             @Override
@@ -88,8 +45,7 @@
                 NetworkOutputChannel channel = (NetworkOutputChannel) writer;
                 channel.setFrameSize(resultState.getFrameSize());
                 try {
-                    fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
-                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    resultState.readOpen();
                     channel.open();
                     try {
                         long offset = 0;
@@ -109,7 +65,7 @@
                         }
                     } finally {
                         channel.close();
-                        resultState.getIOManager().close(fileHandle);
+                        resultState.readClose();
                     }
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
@@ -120,6 +76,10 @@
                     LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
                 }
             }
+
+            private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+                return resultState.read(datasetMemoryManager, offset, buffer);
+            }
         });
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..c1f8f29 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,20 +18,17 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
 
     private static final String FILE_PREFIX = "result_";
@@ -50,8 +47,6 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
             ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
         this.manager = manager;
@@ -75,30 +70,12 @@
         }
         String fName = FILE_PREFIX + String.valueOf(partition);
         FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
-        fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        resultState.init(fRef, fileHandle);
+        resultState.open(fRef);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        int srcOffset = 0;
-        Page destPage = resultState.getLastPage();
-
-        while (srcOffset < buffer.limit()) {
-            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
-                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
-                resultState.addPage(destPage);
-            }
-            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
-            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
-            srcOffset += srcLength;
-            resultState.incrementSize(srcLength);
-        }
-
-        synchronized (resultState) {
-            resultState.notifyAll();
-        }
+        resultState.write(datasetMemoryManager, buffer);
     }
 
     @Override
@@ -117,32 +94,10 @@
         }
 
         try {
-            synchronized (resultState) {
-                resultState.setEOS(true);
-                resultState.notifyAll();
-            }
+            resultState.close();
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public Page returnPage() throws HyracksDataException {
-        Page page = resultState.removePage(0);
-
-        IIOManager ioManager = resultState.getIOManager();
-
-        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
-        if (page == null) {
-            ioManager.close(fileHandle);
-            return null;
-        }
-
-        page.getBuffer().flip();
-
-        long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
-        resultState.incrementPersistentSize(delta);
-        return page;
-    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..097c663 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,12 +17,14 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
@@ -38,14 +40,14 @@
 
     private final AtomicBoolean eos;
 
-    private final AtomicBoolean readEOS;
-
     private final List<Page> localPageList;
 
     private FileReference fileRef;
 
     private IFileHandle writeFileHandle;
 
+    private IFileHandle readFileHandle;
+
     private long size;
 
     private long persistentSize;
@@ -55,20 +57,26 @@
         this.ioManager = ioManager;
         this.frameSize = frameSize;
         eos = new AtomicBoolean(false);
-        readEOS = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
     }
 
-    public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
+    public synchronized void open(FileReference fileRef) throws HyracksDataException {
         this.fileRef = fileRef;
-        this.writeFileHandle = writeFileHandle;
+
+        writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
 
         size = 0;
         persistentSize = 0;
         notifyAll();
     }
 
-    public synchronized void deinit() {
+    public synchronized void close() {
+        eos.set(true);
+        notifyAll();
+    }
+
+    public synchronized void closeAndDelete() {
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -79,6 +87,98 @@
         fileRef.delete();
     }
 
+    public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+            throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = getPage(localPageList.size() - 1);
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                localPageList.add(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            size += srcLength;
+        }
+
+        notifyAll();
+    }
+
+    public synchronized void readOpen() throws InterruptedException, HyracksDataException {
+        while (fileRef == null) {
+            wait();
+        }
+        readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+    }
+
+    public synchronized void readClose() throws InterruptedException, HyracksDataException {
+        while (fileRef == null) {
+            wait();
+        }
+        readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+    }
+
+    public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+            throws HyracksDataException {
+        long readSize = 0;
+        synchronized (this) {
+            while (offset >= size && !eos.get()) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+
+        if (offset >= size && eos.get()) {
+            return readSize;
+        }
+
+        if (offset < persistentSize) {
+            readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+        }
+
+        if (readSize < buffer.capacity()) {
+            long localPageOffset = offset - persistentSize;
+            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+            Page page = getPage(localPageIndex);
+            if (page == null) {
+                return readSize;
+            }
+            readSize += buffer.remaining();
+            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+        }
+
+        datasetMemoryManager.pageReferenced(resultSetPartitionId);
+        return readSize;
+    }
+
+    public synchronized Page returnPage() throws HyracksDataException {
+        Page page = removePage(0);
+
+        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+        if (page == null) {
+            ioManager.close(writeFileHandle);
+            return null;
+        }
+
+        page.getBuffer().flip();
+
+        long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+        persistentSize += delta;
+        return page;
+    }
+
+    public synchronized void setEOS(boolean eos) {
+        this.eos.set(eos);
+    }
+
     public ResultSetPartitionId getResultSetPartitionId() {
         return resultSetPartitionId;
     }
@@ -91,76 +191,6 @@
         return ioManager;
     }
 
-    public synchronized void incrementSize(long delta) {
-        size += delta;
-    }
-
-    public synchronized long getSize() {
-        return size;
-    }
-
-    public synchronized void incrementPersistentSize(long delta) {
-        persistentSize += delta;
-    }
-
-    public synchronized long getPersistentSize() {
-        return persistentSize;
-    }
-
-    public void setEOS(boolean eos) {
-        this.eos.set(eos);
-    }
-
-    public boolean getEOS() {
-        return eos.get();
-    }
-
-    public boolean getReadEOS() {
-        return readEOS.get();
-    }
-
-    public synchronized void addPage(Page page) {
-        localPageList.add(page);
-    }
-
-    public synchronized Page removePage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.remove(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getPage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getLastPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(localPageList.size() - 1);
-        }
-        return page;
-    }
-
-    public synchronized Page getFirstPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(0);
-        }
-        return page;
-    }
-
-    public synchronized FileReference getValidFileReference() throws InterruptedException {
-        while (fileRef == null)
-            wait();
-        return fileRef;
-    }
-
     @Override
     public JobId getJobId() {
         return resultSetPartitionId.getJobId();
@@ -185,4 +215,20 @@
     public void fromBytes(DataInput in) throws IOException {
         throw new UnsupportedOperationException();
     }
+
+    private Page getPage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(index);
+        }
+        return page;
+    }
+
+    private Page removePage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(index);
+        }
+        return page;
+    }
 }
\ No newline at end of file