[NO ISSUE] Use Async Write Mode in ResultState

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Use Async write mode in ResultState to improve it's throughput.
- For concurrent reader, use the same file handle (since it's a
RandomAccessFile) for both read/write. Reference counting is used to
ensure the file is properly opened/closed.

Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2560
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index b832b20..6b35912 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -61,9 +61,9 @@
 
     private FileReference fileRef;
 
-    private IFileHandle writeFileHandle;
+    private IFileHandle fileHandle;
 
-    private IFileHandle readFileHandle;
+    private volatile int referenceCount = 0;
 
     private long size;
 
@@ -86,12 +86,13 @@
         localPageList = new ArrayList<>();
 
         fileRef = null;
-        writeFileHandle = null;
+        fileHandle = null;
     }
 
     public synchronized void open() {
         size = 0;
         persistentSize = 0;
+        referenceCount = 0;
     }
 
     public synchronized void close() {
@@ -112,25 +113,29 @@
     }
 
     private void closeWriteFileHandle() {
-        if (writeFileHandle != null) {
+        if (fileHandle != null) {
+            doCloseFileHandle();
+        }
+    }
+
+    private void doCloseFileHandle() {
+        if (--referenceCount == 0) {
+            // close the file if there is no more reference
             try {
-                ioManager.close(writeFileHandle);
+                ioManager.close(fileHandle);
             } catch (IOException e) {
                 // Since file handle could not be closed, just ignore.
             }
-            writeFileHandle = null;
+            fileHandle = null;
         }
     }
 
     public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
         if (fileRef == null) {
-            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
-            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
-            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC);
+            initWriteFileHandle();
         }
 
-        size += ioManager.syncWrite(writeFileHandle, size, buffer);
+        size += ioManager.syncWrite(fileHandle, size, buffer);
         notifyAll();
     }
 
@@ -165,9 +170,8 @@
     }
 
     public synchronized void readClose() throws HyracksDataException {
-        if (readFileHandle != null) {
-            ioManager.close(readFileHandle);
-            readFileHandle = null;
+        if (fileHandle != null) {
+            doCloseFileHandle();
         }
     }
 
@@ -185,51 +189,49 @@
             return readSize;
         }
 
-        if (readFileHandle == null) {
+        if (fileHandle == null) {
             initReadFileHandle();
         }
-        readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+        readSize = ioManager.syncRead(fileHandle, offset, buffer);
 
         return readSize;
     }
 
-    public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+    public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
             throws HyracksDataException {
         long readSize = 0;
-        synchronized (this) {
-            while (offset >= size && !eos.get() && !failed.get()) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw HyracksDataException.create(e);
-                }
+        while (offset >= size && !eos.get() && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw HyracksDataException.create(e);
             }
+        }
 
-            if ((offset >= size && eos.get()) || failed.get()) {
+        if ((offset >= size && eos.get()) || failed.get()) {
+            return readSize;
+        }
+
+        if (offset < persistentSize) {
+            if (fileHandle == null) {
+                initReadFileHandle();
+            }
+            readSize = ioManager.syncRead(fileHandle, offset, buffer);
+            if (readSize < 0) {
+                throw new HyracksDataException("Premature end of file");
+            }
+        }
+
+        if (readSize < buffer.capacity()) {
+            long localPageOffset = offset - persistentSize;
+            int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+            int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+            Page page = getPage(localPageIndex);
+            if (page == null) {
                 return readSize;
             }
-
-            if (offset < persistentSize) {
-                if (readFileHandle == null) {
-                    initReadFileHandle();
-                }
-                readSize = ioManager.syncRead(readFileHandle, offset, buffer);
-                if (readSize < 0) {
-                    throw new HyracksDataException("Premature end of file");
-                }
-            }
-
-            if (readSize < buffer.capacity()) {
-                long localPageOffset = offset - persistentSize;
-                int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
-                int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
-                Page page = getPage(localPageIndex);
-                if (page == null) {
-                    return readSize;
-                }
-                readSize += buffer.remaining();
-                buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
-            }
+            readSize += buffer.remaining();
+            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
         }
         datasetMemoryManager.pageReferenced(resultSetPartitionId);
         return readSize;
@@ -245,21 +247,17 @@
 
         // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
         if (page == null) {
-            ioManager.close(writeFileHandle);
+            ioManager.close(fileHandle);
             return null;
         }
 
         page.getBuffer().flip();
 
         if (fileRef == null) {
-            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
-            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
-            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-            notifyAll();
+            initWriteFileHandle();
         }
 
-        long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+        long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer());
         persistentSize += delta;
         return page;
     }
@@ -325,8 +323,23 @@
         return page;
     }
 
+    private void initWriteFileHandle() throws HyracksDataException {
+        if (fileHandle == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            if (referenceCount != 0) {
+                throw new IllegalStateException("Illegal reference count " + referenceCount);
+            }
+            referenceCount = 1;
+            notifyAll(); // NOSONAR: always called from a synchronized block
+        }
+    }
+
     private void initReadFileHandle() throws HyracksDataException {
         while (fileRef == null && !failed.get()) {
+            // wait for writer to create the file
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -336,9 +349,12 @@
         if (failed.get()) {
             return;
         }
-
-        readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        if (fileHandle == null) {
+            // fileHandle has been closed by the writer, create it again
+            fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        }
+        referenceCount++;
     }
 
     @Override