Merge fullstack_rsd_fixes r3330:r3333.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3336 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 296c502..a584b4b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,6 +69,9 @@
int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
Page page = resultState.getPage(localPageIndex);
+ if (page == null) {
+ return readSize;
+ }
readSize += buffer.remaining();
buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index f6ae540..317f553 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -77,7 +77,7 @@
FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- resultState.init(fRef);
+ resultState.init(fRef, fileHandle);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 0f1d94c..661df93 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
@@ -43,6 +44,8 @@
private FileReference fileRef;
+ private IFileHandle writeFileHandle;
+
private long size;
private long persistentSize;
@@ -56,8 +59,9 @@
localPageList = new ArrayList<Page>();
}
- public synchronized void init(FileReference fileRef) {
+ public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
this.fileRef = fileRef;
+ this.writeFileHandle = writeFileHandle;
size = 0;
persistentSize = 0;
@@ -65,6 +69,13 @@
}
public synchronized void deinit() {
+ if (writeFileHandle != null) {
+ try {
+ ioManager.close(writeFileHandle);
+ } catch (IOException e) {
+ // Since file handle could not be closed, just ignore.
+ }
+ }
fileRef.delete();
}