Lock conservatively for writes and don't lock result state at all for references.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index 4a3c72d..8fef455 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -49,8 +49,8 @@
resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
}
- public Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
- throws OutOfMemoryError, HyracksDataException {
+ public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+ throws HyracksDataException {
Page page;
if (availPages.isEmpty()) {
if (availableMemory >= FRAME_SIZE) {
@@ -94,8 +94,7 @@
resultPartitionNodesMap.put(resultSetPartitionId, pn);
}
- protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
- ResultState resultState) {
+ protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
PartitionNode pn = null;
if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
@@ -105,14 +104,16 @@
}
return pn;
}
- pn = resultPartitionNodesMap.get(resultSetPartitionId);
- leastRecentlyUsedList.remove(pn);
- insertPartitionNode(resultSetPartitionId, pn);
+ synchronized (this) {
+ pn = resultPartitionNodesMap.get(resultSetPartitionId);
+ leastRecentlyUsedList.remove(pn);
+ insertPartitionNode(resultSetPartitionId, pn);
+ }
return pn;
}
- protected synchronized Page evictPage() throws HyracksDataException {
+ protected Page evictPage() throws HyracksDataException {
PartitionNode pn = leastRecentlyUsedList.getFirst();
ResultState resultState = pn.getResultState();
Page page = resultState.returnPage();
@@ -144,7 +145,7 @@
return page;
}
- protected synchronized Page getAvailablePage() {
+ protected Page getAvailablePage() {
Iterator<Page> iter = availPages.iterator();
Page page = iter.next();
iter.remove();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 73044a1..09d1bff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -49,12 +49,6 @@
private final List<Page> localPageList;
- private final List<Integer> inMemoryList;
-
- private Page tailPage;
-
- private int tailPageLength;
-
private FileReference fileRef;
private IFileHandle writeFileHandle;
@@ -74,10 +68,7 @@
eos = new AtomicBoolean(false);
failed = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
- inMemoryList = new ArrayList<Integer>();
- tailPage = null;
- tailPageLength = 0;
fileRef = null;
writeFileHandle = null;
}
@@ -88,16 +79,14 @@
}
public synchronized void close() {
- if (tailPage != null) {
- localPageList.add(tailPage);
- inMemoryList.add(localPageList.size() - 1);
- size += tailPageLength;
- }
eos.set(true);
notifyAll();
}
public synchronized void closeAndDelete() {
+ // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+ // to be taken when there are more requests to these result states.
+ failed.set(true);
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
@@ -113,23 +102,21 @@
public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
throws HyracksDataException {
int srcOffset = 0;
+ Page destPage = null;
- if (tailPage == null) {
- tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ if (!localPageList.isEmpty()) {
+ destPage = localPageList.get(localPageList.size() - 1);
}
while (srcOffset < buffer.limit()) {
- if (tailPage.getBuffer().remaining() <= 0) {
- localPageList.add(tailPage);
- inMemoryList.add(localPageList.size() - 1);
- size += tailPageLength;
- tailPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
- tailPageLength = 0;
+ if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+ destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ localPageList.add(destPage);
}
- int srcLength = Math.min(buffer.limit() - srcOffset, tailPage.getBuffer().remaining());
- tailPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+ destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
srcOffset += srcLength;
- tailPageLength += srcLength;
+ size += srcLength;
}
notifyAll();
@@ -145,7 +132,7 @@
}
}
- public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
throws HyracksDataException {
long readSize = 0;
synchronized (this) {
@@ -156,31 +143,30 @@
throw new HyracksDataException(e);
}
}
- }
- if ((offset >= size && eos.get()) || failed.get()) {
- return readSize;
- }
-
- if (offset < persistentSize) {
- if (readFileHandle == null) {
- initReadFileHandle();
- }
- readSize = ioManager.syncRead(readFileHandle, offset, buffer);
- }
-
- if (readSize < buffer.capacity()) {
- long localPageOffset = offset - persistentSize;
- int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
- Page page = getPage(localPageIndex);
- if (page == null) {
+ if ((offset >= size && eos.get()) || failed.get()) {
return readSize;
}
- readSize += buffer.remaining();
- buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
- }
+ if (offset < persistentSize) {
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - persistentSize;
+ int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+ Page page = getPage(localPageIndex);
+ if (page == null) {
+ return readSize;
+ }
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+ }
+ }
datasetMemoryManager.pageReferenced(resultSetPartitionId);
return readSize;
}
@@ -265,9 +251,8 @@
private Page removePage() {
Page page = null;
- if (!inMemoryList.isEmpty()) {
- int index = inMemoryList.get(inMemoryList.size() - 1);
- page = localPageList.set(index, null);
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(localPageList.size() - 1);
}
return page;
}