More refactoring.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 7b61021..b4703b9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -84,7 +84,7 @@
             synchronized (this) {
                 ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
                         nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
 
                 ResultState[] resultStates = partitionResultStateMap.get(jobId);
                 if (resultStates == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index c1f8f29..2abbed5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -24,15 +24,13 @@
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class DatasetPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
 
-    private static final String FILE_PREFIX = "result_";
-
     private final IDatasetPartitionManager manager;
 
     private final JobId jobId;
@@ -48,7 +46,8 @@
     private final ResultState resultState;
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+            IWorkspaceFileFactory fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
@@ -56,7 +55,7 @@
         this.datasetMemoryManager = datasetMemoryManager;
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
-        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
     }
 
     public ResultState getResultState() {
@@ -64,13 +63,11 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + partition + ")");
         }
-        String fName = FILE_PREFIX + String.valueOf(partition);
-        FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
-        resultState.open(fRef);
+        resultState.open();
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 70d9714..a91db17 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -28,16 +28,21 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class ResultState implements IStateObject {
+    private static final String FILE_PREFIX = "result_";
+
     private final ResultSetPartitionId resultSetPartitionId;
 
     private final int frameSize;
 
     private final IIOManager ioManager;
 
+    private final IWorkspaceFileFactory fileFactory;
+
     private final AtomicBoolean eos;
 
     private final List<Page> localPageList;
@@ -52,22 +57,22 @@
 
     private long persistentSize;
 
-    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+            int frameSize) {
         this.resultSetPartitionId = resultSetPartitionId;
         this.ioManager = ioManager;
+        this.fileFactory = fileFactory;
         this.frameSize = frameSize;
         eos = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
 
+        fileRef = null;
         writeFileHandle = null;
     }
 
-    public synchronized void open(FileReference fileRef) throws HyracksDataException {
-        this.fileRef = fileRef;
-
+    public synchronized void open() {
         size = 0;
         persistentSize = 0;
-        notifyAll();
     }
 
     public synchronized void close() {
@@ -169,9 +174,12 @@
 
         page.getBuffer().flip();
 
-        if (writeFileHandle == null) {
+        if (fileRef == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
             writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
                     IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            notifyAll();
         }
 
         long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());