Merge fullstack_rsd_fixes r3330:r3333.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3336 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 296c502..a584b4b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,6 +69,9 @@
             int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
             int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
             Page page = resultState.getPage(localPageIndex);
+            if (page == null) {
+            	return readSize;
+            }
             readSize += buffer.remaining();
             buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
         }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index f6ae540..317f553 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -77,7 +77,7 @@
         FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
         fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        resultState.init(fRef);
+        resultState.init(fRef, fileHandle);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 0f1d94c..661df93 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
@@ -43,6 +44,8 @@
 
     private FileReference fileRef;
 
+    private IFileHandle writeFileHandle;
+
     private long size;
 
     private long persistentSize;
@@ -56,8 +59,9 @@
         localPageList = new ArrayList<Page>();
     }
 
-    public synchronized void init(FileReference fileRef) {
+    public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
         this.fileRef = fileRef;
+        this.writeFileHandle = writeFileHandle;
 
         size = 0;
         persistentSize = 0;
@@ -65,6 +69,13 @@
     }
 
     public synchronized void deinit() {
+        if (writeFileHandle != null) {
+            try {
+                ioManager.close(writeFileHandle);
+            } catch (IOException e) {
+                // Since file handle could not be closed, just ignore.
+            }
+        }
         fileRef.delete();
     }