[ASTERIXDB-2040][STO] Reopen closed file due to interrupts
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- When a file channel is closed due to interruption,
reopen the file to allow new readers to read it.
- Add a test case to test the new behavior.
Change-Id: I06d7719801282dbf4a4c16ec3d1cdcac2a62e631
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1942
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f71dcdf..9054377 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -29,11 +29,10 @@
import org.apache.hyracks.api.io.IIOManager;
public class FileHandle implements IFileHandle {
+
private final FileReference fileRef;
-
private RandomAccessFile raf;
-
- private FileChannel channel;
+ private String mode;
public FileHandle(FileReference fileRef) {
this.fileRef = fileRef;
@@ -47,7 +46,6 @@
* @throws IOException
*/
public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
- String mode;
if (!fileRef.getFile().exists()) {
throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileRef.getAbsolutePath());
}
@@ -60,15 +58,12 @@
case METADATA_ASYNC_DATA_ASYNC:
mode = "rw";
break;
-
case METADATA_ASYNC_DATA_SYNC:
mode = "rwd";
break;
-
case METADATA_SYNC_DATA_SYNC:
mode = "rws";
break;
-
default:
throw new IllegalArgumentException();
}
@@ -77,7 +72,7 @@
default:
throw new IllegalArgumentException();
}
- raf = new RandomAccessFile(fileRef.getFile(), mode);
+ ensureOpen();
}
public void close() throws IOException {
@@ -94,10 +89,16 @@
}
public FileChannel getFileChannel() {
- if (channel == null) {
- channel = raf.getChannel();
- }
- return channel;
+ return raf.getChannel();
}
+ public synchronized void ensureOpen() throws HyracksDataException {
+ if (raf == null || !raf.getChannel().isOpen()) {
+ try {
+ raf = new RandomAccessFile(fileRef.getFile(), mode);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 10c7415..3da30ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -22,6 +22,7 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -203,10 +204,13 @@
n += len;
}
return n;
- } catch (HyracksDataException e) {
- throw e;
+ } catch (ClosedByInterruptException e) {
+ Thread.currentThread().interrupt();
+ // re-open the closed channel. The channel will be closed during the typical file lifecycle
+ ((FileHandle) fHandle).ensureOpen();
+ throw HyracksDataException.create(e);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
new file mode 100644
index 0000000..90b882c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IOManagerTest {
+
+ private static File testFile;
+
+ @Test
+ public void interruptedReadTest() throws IOException, InterruptedException {
+ final IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+ final String fileName = String.valueOf(System.currentTimeMillis());
+ final FileReference fileRef = ioManager.resolve(fileName);
+ testFile = fileRef.getFile();
+ // create the file
+ IoUtil.create(fileRef);
+ // open file handle
+ final FileHandle fileHandle = (FileHandle) ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ int theOnlyOne = 1;
+ // write integer into the file
+ final ByteBuffer writeBuffer = ByteBuffer.allocate(Integer.BYTES).putInt(theOnlyOne);
+ writeBuffer.flip();
+ ioManager.syncWrite(fileHandle, 0, writeBuffer);
+
+ final ByteBuffer readBuffer = ByteBuffer.allocate(Integer.BYTES);
+ Thread interruptedReader = new Thread(() -> {
+ try {
+ Thread.currentThread().interrupt();
+ // The file handle will be closed by ClosedByInterruptException
+ ioManager.syncRead(fileHandle, 0, readBuffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ interruptedReader.start();
+ interruptedReader.join();
+ // we should still be able to read from the file handle
+ ioManager.syncRead(fileHandle, 0, readBuffer);
+ Assert.assertEquals(theOnlyOne, readBuffer.getInt(0));
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ FileUtils.deleteQuietly(testFile);
+ }
+}