Add support for maintaining tailPage pointers.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 550e0a0..0cd9cec 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -47,6 +47,12 @@
private final List<Page> localPageList;
+ private final List<Integer> inMemoryList;
+
+ private Page tailPage;
+
+ private int tailPageLength;
+
private FileReference fileRef;
private IFileHandle writeFileHandle;
@@ -65,7 +71,10 @@
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
+ inMemoryList = new ArrayList<Integer>();
+ tailPage = null;
+ tailPageLength = 0;
fileRef = null;
writeFileHandle = null;
}
@@ -76,6 +85,11 @@
}
public synchronized void close() {
+ if (tailPage != null) {
+ localPageList.add(tailPage);
+ inMemoryList.add(localPageList.size() - 1);
+ size += tailPageLength;
+ }
eos.set(true);
notifyAll();
}
@@ -94,17 +108,23 @@
public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
throws HyracksDataException {
int srcOffset = 0;
- Page destPage = getPage(localPageList.size() - 1);
+
+ if (tailPage == null) {
+ tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ }
while (srcOffset < buffer.limit()) {
- if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
- destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
- localPageList.add(destPage);
+ if (tailPage.getBuffer().remaining() <= 0) {
+ localPageList.add(tailPage);
+ inMemoryList.add(localPageList.size() - 1);
+ size += tailPageLength;
+ tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ tailPageLength = 0;
}
- int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
- destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ int srcLength = Math.min(buffer.limit() - srcOffset, tailPage.getBuffer().remaining());
+ tailPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
srcOffset += srcLength;
- size += srcLength;
+ tailPageLength += srcLength;
}
notifyAll();
@@ -161,7 +181,7 @@
}
public synchronized Page returnPage() throws HyracksDataException {
- Page page = removePage(0);
+ Page page = removePage();
// If we do not have any pages to be given back close the write channel since we don't write any more, return null.
if (page == null) {
@@ -233,10 +253,11 @@
return page;
}
- private Page removePage(int index) {
+ private Page removePage() {
Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.remove(index);
+ if (!inMemoryList.isEmpty()) {
+ int index = inMemoryList.get(inMemoryList.size() - 1);
+ page = localPageList.set(index, null);
}
return page;
}