[NO ISSUE] Use Async Write Mode in ResultState
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Use Async write mode in ResultState to improve it's throughput.
- For concurrent reader, use the same file handle (since it's a
RandomAccessFile) for both read/write. Reference counting is used to
ensure the file is properly opened/closed.
Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2560
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index b832b20..6b35912 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -61,9 +61,9 @@
private FileReference fileRef;
- private IFileHandle writeFileHandle;
+ private IFileHandle fileHandle;
- private IFileHandle readFileHandle;
+ private volatile int referenceCount = 0;
private long size;
@@ -86,12 +86,13 @@
localPageList = new ArrayList<>();
fileRef = null;
- writeFileHandle = null;
+ fileHandle = null;
}
public synchronized void open() {
size = 0;
persistentSize = 0;
+ referenceCount = 0;
}
public synchronized void close() {
@@ -112,25 +113,29 @@
}
private void closeWriteFileHandle() {
- if (writeFileHandle != null) {
+ if (fileHandle != null) {
+ doCloseFileHandle();
+ }
+ }
+
+ private void doCloseFileHandle() {
+ if (--referenceCount == 0) {
+ // close the file if there is no more reference
try {
- ioManager.close(writeFileHandle);
+ ioManager.close(fileHandle);
} catch (IOException e) {
// Since file handle could not be closed, just ignore.
}
- writeFileHandle = null;
+ fileHandle = null;
}
}
public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
if (fileRef == null) {
- String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
- fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
- writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC);
+ initWriteFileHandle();
}
- size += ioManager.syncWrite(writeFileHandle, size, buffer);
+ size += ioManager.syncWrite(fileHandle, size, buffer);
notifyAll();
}
@@ -165,9 +170,8 @@
}
public synchronized void readClose() throws HyracksDataException {
- if (readFileHandle != null) {
- ioManager.close(readFileHandle);
- readFileHandle = null;
+ if (fileHandle != null) {
+ doCloseFileHandle();
}
}
@@ -185,51 +189,49 @@
return readSize;
}
- if (readFileHandle == null) {
+ if (fileHandle == null) {
initReadFileHandle();
}
- readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ readSize = ioManager.syncRead(fileHandle, offset, buffer);
return readSize;
}
- public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
throws HyracksDataException {
long readSize = 0;
- synchronized (this) {
- while (offset >= size && !eos.get() && !failed.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw HyracksDataException.create(e);
- }
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
}
+ }
- if ((offset >= size && eos.get()) || failed.get()) {
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (offset < persistentSize) {
+ if (fileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(fileHandle, offset, buffer);
+ if (readSize < 0) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - persistentSize;
+ int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+ Page page = getPage(localPageIndex);
+ if (page == null) {
return readSize;
}
-
- if (offset < persistentSize) {
- if (readFileHandle == null) {
- initReadFileHandle();
- }
- readSize = ioManager.syncRead(readFileHandle, offset, buffer);
- if (readSize < 0) {
- throw new HyracksDataException("Premature end of file");
- }
- }
-
- if (readSize < buffer.capacity()) {
- long localPageOffset = offset - persistentSize;
- int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
- Page page = getPage(localPageIndex);
- if (page == null) {
- return readSize;
- }
- readSize += buffer.remaining();
- buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
- }
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
}
datasetMemoryManager.pageReferenced(resultSetPartitionId);
return readSize;
@@ -245,21 +247,17 @@
// If we do not have any pages to be given back close the write channel since we don't write any more, return null.
if (page == null) {
- ioManager.close(writeFileHandle);
+ ioManager.close(fileHandle);
return null;
}
page.getBuffer().flip();
if (fileRef == null) {
- String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
- fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
- writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- notifyAll();
+ initWriteFileHandle();
}
- long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+ long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer());
persistentSize += delta;
return page;
}
@@ -325,8 +323,23 @@
return page;
}
+ private void initWriteFileHandle() throws HyracksDataException {
+ if (fileHandle == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ if (referenceCount != 0) {
+ throw new IllegalStateException("Illegal reference count " + referenceCount);
+ }
+ referenceCount = 1;
+ notifyAll(); // NOSONAR: always called from a synchronized block
+ }
+ }
+
private void initReadFileHandle() throws HyracksDataException {
while (fileRef == null && !failed.get()) {
+ // wait for writer to create the file
try {
wait();
} catch (InterruptedException e) {
@@ -336,9 +349,12 @@
if (failed.get()) {
return;
}
-
- readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ if (fileHandle == null) {
+ // fileHandle has been closed by the writer, create it again
+ fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+ referenceCount++;
}
@Override