[ASTERIXDB-3184][STO] Unify I/O APIs

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Unify file operations by using IIOManager

Change-Id: Ifb77c24ee855537bcf725b2eb1e28b1af200f3ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17524
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 290734f..2ed1638 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -18,15 +18,11 @@
  */
 package org.apache.asterix.app.nc;
 
-import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -34,9 +30,11 @@
 import org.apache.asterix.common.storage.IndexCheckpoint;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,10 +47,12 @@
     private static final FilenameFilter CHECKPOINT_FILE_FILTER =
             (file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
     private static final long BULKLOAD_LSN = 0;
-    private final Path indexPath;
+    private final FileReference indexPath;
+    private final IIOManager ioManager;
 
-    public IndexCheckpointManager(Path indexPath) {
+    public IndexCheckpointManager(FileReference indexPath, IIOManager ioManager) {
         this.indexPath = indexPath;
+        this.ioManager = ioManager;
     }
 
     @Override
@@ -154,7 +154,7 @@
         }
         if (checkpoints.isEmpty()) {
             LOGGER.warn("Couldn't find any checkpoint file for index {}. Content of dir are {}.", indexPath,
-                    Arrays.toString(indexPath.toFile().listFiles()));
+                    ioManager.getMatchingFiles(indexPath, IoUtil.NO_OP_FILTER).toString());
             throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
         }
         checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -180,13 +180,13 @@
         }
     }
 
-    private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException {
+    private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException, HyracksDataException {
         List<IndexCheckpoint> checkpoints = new ArrayList<>();
-        final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
-        if (checkpointFiles != null) {
-            for (File checkpointFile : checkpointFiles) {
+        final Collection<FileReference> checkpointFiles = ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+        if (!checkpointFiles.isEmpty()) {
+            for (FileReference checkpointFile : checkpointFiles) {
                 try {
-                    checkpoints.add(read(checkpointFile.toPath()));
+                    checkpoints.add(read(checkpointFile));
                 } catch (ClosedByInterruptException e) {
                     throw e;
                 } catch (IOException e) {
@@ -198,14 +198,14 @@
     }
 
     private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
-        final Path checkpointPath = getCheckpointPath(checkpoint);
+        final FileReference checkpointPath = getCheckpointPath(checkpoint);
         for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
             try {
                 // clean up from previous write failure
-                if (checkpointPath.toFile().exists()) {
-                    Files.delete(checkpointPath);
+                if (ioManager.exists(checkpointPath)) {
+                    ioManager.delete(checkpointPath);
                 }
-                FileUtil.writeAndForce(checkpointPath, checkpoint.asJson().getBytes());
+                ioManager.overwrite(checkpointPath, checkpoint.asJson().getBytes());
                 // ensure it was written correctly by reading it
                 read(checkpointPath);
                 return;
@@ -223,17 +223,18 @@
         }
     }
 
-    private IndexCheckpoint read(Path checkpointPath) throws IOException {
-        return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
+    private IndexCheckpoint read(FileReference checkpointPath) throws IOException {
+        return IndexCheckpoint.fromJson(new String(ioManager.readAllBytes(checkpointPath)));
     }
 
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
-            final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
-            if (checkpointFiles != null) {
-                for (File checkpointFile : checkpointFiles) {
-                    if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
-                        Files.delete(checkpointFile.toPath());
+            final Collection<FileReference> checkpointFiles =
+                    ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+            if (!checkpointFiles.isEmpty()) {
+                for (FileReference checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile) < (latestId - historyToKeep)) {
+                        ioManager.delete(checkpointFile);
                     }
                 }
             }
@@ -242,13 +243,12 @@
         }
     }
 
-    private Path getCheckpointPath(IndexCheckpoint checkpoint) {
-        return Paths.get(indexPath.toString(),
-                StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
+    private FileReference getCheckpointPath(IndexCheckpoint checkpoint) {
+        return indexPath.getChild(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + checkpoint.getId());
     }
 
-    private long getCheckpointIdFromFileName(Path checkpointPath) {
-        return Long.valueOf(checkpointPath.getFileName().toString()
-                .substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+    private long getCheckpointIdFromFileName(FileReference checkpointPath) {
+        return Long
+                .parseLong(checkpointPath.getName().substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
index e0b3105..1e08ed8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.app.nc;
 
-import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -27,6 +26,7 @@
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 
 public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
@@ -54,8 +54,8 @@
 
     private IndexCheckpointManager create(ResourceReference ref) {
         try {
-            final Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
-            return new IndexCheckpointManager(indexPath);
+            final FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+            return new IndexCheckpointManager(indexPath, ioManager);
         } catch (HyracksDataException e) {
             throw new IllegalStateException(e);
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index 6f4ce69..97d8d80 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -18,7 +18,8 @@
  */
 package org.apache.asterix.cloud;
 
-import static org.apache.asterix.common.utils.StorageConstants.*;
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -151,6 +152,7 @@
         super.close(fHandle);
     }
 
+    // TODO This method should not do any syncing. It simply should list the files
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
         Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
@@ -223,6 +225,14 @@
     }
 
     @Override
+    public long getSize(FileReference fileReference) {
+        if (!fileReference.getFile().exists()) {
+            return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+        }
+        return super.getSize(fileReference);
+    }
+
+    @Override
     public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
         super.overwrite(fileRef, bytes);
         // Write here will overwrite the older object if exists
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..999636d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -21,8 +21,6 @@
 
 import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
@@ -77,7 +75,7 @@
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
-    private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
+    private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
 
     public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
@@ -307,10 +305,8 @@
 
     private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
         FileReference target = operation.getTarget();
-        final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
-        Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
-        Path maskFileRelPath =
-                Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
-        return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
+        String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
+        FileReference idxRelPath = target.getParent();
+        return idxRelPath.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index a9ed066..9702b18 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.common.utils;
 
 import java.io.File;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Iterator;
 
@@ -31,6 +30,7 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.MappedFileSplit;
@@ -180,7 +180,7 @@
      * @return
      * @throws HyracksDataException
      */
-    public static Path getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
-        return ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+    public static FileReference getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
+        return ioManager.resolve(ref.getRelativePath().toString());
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 97b6556..a6ba0e2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -24,7 +24,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Path;
 import java.util.Collection;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -36,6 +35,7 @@
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
@@ -69,14 +69,15 @@
             DatasetResourceReference ref = DatasetResourceReference.of(ls);
             final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
             // Get most recent sequence of existing files to avoid deletion
-            Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
-            String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+            FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+            Collection<FileReference> files =
+                    ioManager.getMatchingFiles(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
             if (files == null) {
                 throw HyracksDataException
                         .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
             }
             long maxComponentSequence = UNINITIALIZED_COMPONENT_SEQ;
-            for (String file : files) {
+            for (FileReference file : files) {
                 maxComponentSequence =
                         Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 3f04bd2..e9af85c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -22,9 +22,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -50,21 +47,22 @@
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
+            IIOManager ioManager = appCtx.getIoManager();
             // create mask
-            final Path maskPath = getComponentMaskPath(appCtx, file);
-            Files.createFile(maskPath);
+            final FileReference maskPath = getComponentMaskPath(ioManager, file);
+            ioManager.create(maskPath);
             ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
     }
 
-    public static Path getComponentMaskPath(INcApplicationContext appCtx, String componentFile) throws IOException {
-        final IIOManager ioManager = appCtx.getIoManager();
+    public static FileReference getComponentMaskPath(IIOManager ioManager, String componentFile) throws IOException {
         final FileReference localPath = ioManager.resolve(componentFile);
-        final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+        final FileReference resourceDir = localPath.getParent();
+        ioManager.makeDirectories(resourceDir);
         final String componentSequence = ResourceReference.getComponentSequence(componentFile);
-        return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+        return resourceDir.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
     }
 
     @Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 92e4989..a00acfb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -20,10 +20,8 @@
 
 import java.io.DataInput;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +30,7 @@
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -52,9 +51,9 @@
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
-            final File localFile = ioManager.resolve(file).getFile();
-            if (localFile.exists()) {
-                Files.delete(localFile.toPath());
+            final FileReference localFile = ioManager.resolve(file);
+            if (ioManager.exists(localFile)) {
+                ioManager.delete(localFile);
                 ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath());
                 if (replicaRes.isMetadataResource()) {
                     ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 483561b..2b0e2d8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -20,7 +20,6 @@
 
 import java.io.DataInput;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -31,8 +30,8 @@
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,10 +51,10 @@
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
-            final File indexFile = ioManager.resolve(file).getFile();
-            if (indexFile.exists()) {
-                File indexDir = indexFile.getParentFile();
-                IoUtil.delete(indexDir);
+            final FileReference indexFile = ioManager.resolve(file);
+            if (ioManager.exists(indexFile)) {
+                FileReference indexDir = indexFile.getParent();
+                ioManager.deleteDirectory(indexDir);
                 ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
                         .invalidateResource(ResourceReference.of(file).getRelativePath().toString());
                 LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index fa77378..76fde09 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -22,8 +22,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -36,6 +34,8 @@
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.logging.log4j.LogManager;
@@ -67,9 +67,10 @@
             } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
                 ensureComponentLsnFlushed(appCtx);
             }
+            IIOManager ioManager = appCtx.getIoManager();
             // delete mask
-            final Path maskPath = ComponentMaskTask.getComponentMaskPath(appCtx, file);
-            Files.delete(maskPath);
+            final FileReference maskPath = ComponentMaskTask.getComponentMaskPath(ioManager, file);
+            ioManager.delete(maskPath);
             ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
         } catch (IOException | InterruptedException e) {
             throw new ReplicationException(e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 500a5de..71ed63e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -24,11 +24,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -40,10 +36,10 @@
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -73,31 +69,34 @@
             final IIOManager ioManager = appCtx.getIoManager();
             // resolve path
             final FileReference localPath = ioManager.resolve(file);
-            final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+            FileReference resourceDir = localPath.getParent();
+            ioManager.makeDirectories(resourceDir);
             if (indexMetadata) {
                 // ensure clean index directory
-                FileUtils.cleanDirectory(resourceDir.toFile());
+                ioManager.cleanDirectory(resourceDir);
                 ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
                         .invalidateResource(ResourceReference.of(file).getRelativePath().toString());
             }
             // create mask
-            final Path maskPath = Paths.get(resourceDir.toString(),
-                    StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
-            Files.createFile(maskPath);
+            final FileReference maskPath =
+                    resourceDir.getChild(StorageConstants.MASK_FILE_PREFIX + localPath.getName());
+            ioManager.create(maskPath);
             // receive actual file
-            final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
-            Files.createFile(filePath);
-            try (RandomAccessFile fileOutputStream = new RandomAccessFile(filePath.toFile(), "rw");
-                    FileChannel fileChannel = fileOutputStream.getChannel()) {
-                fileOutputStream.setLength(size);
+            ioManager.create(localPath);
+            FileHandle fileHandle = (FileHandle) ioManager.open(localPath, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            try (FileChannel fileChannel = fileHandle.getFileChannel()) {
+                fileHandle.setLength(size);
                 NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
-                fileChannel.force(true);
+                ioManager.sync(fileHandle, true);
+            } finally {
+                ioManager.close(fileHandle);
             }
             if (indexMetadata) {
                 initIndexCheckpoint(appCtx);
             }
             //delete mask
-            Files.delete(maskPath);
+            ioManager.delete(maskPath);
             LOGGER.debug("received file {} from master", localPath);
             ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
         } catch (IOException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1e81346..8ef55e8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -21,6 +21,7 @@
 import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 import static org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
@@ -28,12 +29,8 @@
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,7 +41,6 @@
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -59,7 +55,6 @@
 import org.apache.asterix.common.storage.ResourceStorageStats;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -68,14 +63,12 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -88,18 +81,20 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private static final String METADATA_FILE_MASK_NAME =
             StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
             (dir, name) -> name.startsWith(METADATA_FILE_NAME) || !name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
     private static final FilenameFilter MASK_FILES_FILTER =
             (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
-    private static final int MAX_CACHED_RESOURCES = 1000;
     private static final FilenameFilter METADATA_FILES_FILTER =
             (dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
     private static final FilenameFilter METADATA_MASK_FILES_FILTER =
             (dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
 
+    private static final int MAX_CACHED_RESOURCES = 1000;
+
     // Finals
     private final IIOManager ioManager;
     private final Cache<String, LocalResource> resourceCache;
@@ -107,7 +102,7 @@
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
-    private final List<Path> storageRoots;
+    private final List<FileReference> storageRoots;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
 
@@ -120,8 +115,7 @@
         storageRoots = new ArrayList<>();
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots.add(
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME));
+            storageRoots.add(new FileReference(ioDevices.get(i), STORAGE_ROOT_DIR_NAME));
         }
         createStorageRoots();
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -131,7 +125,7 @@
     public String toString() {
         StringBuilder aString = new StringBuilder().append(PersistentLocalResourceRepository.class.getSimpleName())
                 .append(Character.LINE_SEPARATOR).append(ioManager.getClass().getSimpleName()).append(':')
-                .append(Character.LINE_SEPARATOR).append(ioManager.toString()).append(Character.LINE_SEPARATOR)
+                .append(Character.LINE_SEPARATOR).append(ioManager).append(Character.LINE_SEPARATOR)
                 .append("Cached Resources:").append(Character.LINE_SEPARATOR);
         resourceCache.asMap().forEach(
                 (key, value) -> aString.append(key).append("->").append(value).append(Character.LINE_SEPARATOR));
@@ -143,8 +137,8 @@
         LocalResource resource = resourceCache.getIfPresent(relativePath);
         if (resource == null) {
             FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
-            if (resourceFile.getFile().exists()) {
-                resource = readLocalResource(resourceFile.getFile());
+            resource = readLocalResource(resourceFile);
+            if (resource != null) {
                 resourceCache.put(relativePath, resource);
             }
         }
@@ -162,15 +156,15 @@
                 throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
             }
 
-            final File parent = resourceFile.getFile().getParentFile();
-            if (!parent.exists() && !parent.mkdirs()) {
+            final FileReference parent = resourceFile.getParent();
+            if (!ioManager.exists(parent) && !ioManager.makeDirectories(parent)) {
                 throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
             }
             // The next block should be all or nothing
             try {
                 createResourceFileMask(resourceFile);
                 byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
-                FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+                ioManager.overwrite(resourceFile, bytes);
                 indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
                         UNINITIALIZED_COMPONENT_SEQ, 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
                 deleteResourceFileMask(resourceFile);
@@ -199,7 +193,7 @@
     private void cleanup(FileReference resourceFile) {
         if (resourceFile.getFile().exists()) {
             try {
-                IoUtil.delete(resourceFile);
+                ioManager.delete(resourceFile);
             } catch (Throwable th) {
                 LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
                 ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
@@ -210,7 +204,9 @@
     @Override
     public void delete(String relativePath) throws HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
-        boolean resourceExists = resourceFile.getFile().exists();
+        final LocalResource localResource = readLocalResource(resourceFile);
+
+        boolean resourceExists = localResource != null;
         if (isReplicationEnabled && resourceExists) {
             try {
                 createReplicationJob(ReplicationOperation.DELETE, resourceFile);
@@ -221,8 +217,7 @@
         synchronized (this) {
             try {
                 if (resourceExists) {
-                    final LocalResource localResource = readLocalResource(resourceFile.getFile());
-                    IoUtil.delete(resourceFile);
+                    ioManager.delete(resourceFile);
                     // delete all checkpoints
                     indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
                 } else {
@@ -241,18 +236,15 @@
         return ioManager.resolve(fileName);
     }
 
-    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<Path> roots)
-            throws HyracksDataException {
+    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter,
+            List<FileReference> roots) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (Path root : roots) {
-            if (!Files.exists(root) || !Files.isDirectory(root)) {
-                continue;
-            }
-            final Collection<File> files = IoUtil.getMatchingFiles(root, METADATA_FILES_FILTER);
+        for (FileReference root : roots) {
+            final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
             try {
-                for (File file : files) {
+                for (FileReference file : files) {
                     final LocalResource localResource = readLocalResource(file);
-                    if (filter.test(localResource)) {
+                    if (localResource != null && filter.test(localResource)) {
                         resourcesMap.put(localResource.getId(), localResource);
                     }
                 }
@@ -270,7 +262,7 @@
 
     public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
             throws HyracksDataException {
-        List<Path> partitionsRoots = new ArrayList<>();
+        List<FileReference> partitionsRoots = new ArrayList<>();
         for (Integer partition : partitions) {
             partitionsRoots.add(getPartitionRoot(partition));
         }
@@ -278,14 +270,15 @@
     }
 
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
-        for (Path root : storageRoots) {
-            final Collection<File> files = IoUtil.getMatchingFiles(root, METADATA_FILES_FILTER);
+        for (FileReference root : storageRoots) {
+            final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
             try {
-                for (File file : files) {
+                for (FileReference file : files) {
                     final LocalResource localResource = readLocalResource(file);
-                    if (filter.test(localResource)) {
-                        LOGGER.warn("deleting invalid metadata index {}", file.getParentFile());
-                        IoUtil.delete(file.getParentFile());
+                    if (localResource != null && filter.test(localResource)) {
+                        FileReference parent = file.getParent();
+                        LOGGER.warn("deleting invalid metadata index {}", parent);
+                        ioManager.delete(parent);
                     }
                 }
             } catch (IOException e) {
@@ -319,10 +312,14 @@
                 : (path + File.separator + StorageConstants.METADATA_FILE_NAME);
     }
 
-    private LocalResource readLocalResource(File file) throws HyracksDataException {
-        final Path path = Paths.get(file.getAbsolutePath());
+    private LocalResource readLocalResource(FileReference fileRef) throws HyracksDataException {
+        byte[] bytes = ioManager.readAllBytes(fileRef);
+        if (bytes == null) {
+            return null;
+        }
+
         try {
-            final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(path), JsonNode.class);
+            final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class);
             LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode);
             if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
                 return resource;
@@ -358,15 +355,10 @@
 
     /**
      * Deletes physical files of all data verses.
-     *
-     * @throws IOException
      */
-    public synchronized void deleteStorageData() throws IOException {
-        for (Path root : storageRoots) {
-            final File rootFile = root.toFile();
-            if (rootFile.exists()) {
-                FileUtils.deleteDirectory(rootFile);
-            }
+    public synchronized void deleteStorageData() throws HyracksDataException {
+        for (FileReference root : storageRoots) {
+            ioManager.deleteDirectory(root);
         }
         createStorageRoots();
     }
@@ -392,15 +384,15 @@
      * @return The set of indexes files
      * @throws HyracksDataException
      */
-    public synchronized Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
-        Path partitionRoot = getPartitionRoot(partition);
+    public synchronized Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException {
+        FileReference partitionRoot = getPartitionRoot(partition);
         final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
             DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
             return dsResource.getPartition() == partition;
         }, Collections.singletonList(partitionRoot));
-        Set<File> indexes = new HashSet<>();
+        Set<FileReference> indexes = new HashSet<>();
         for (LocalResource localResource : partitionResourcesMap.values()) {
-            indexes.add(ioManager.resolve(localResource.getPath()).getFile());
+            indexes.add(ioManager.resolve(localResource.getPath()));
         }
         return indexes;
     }
@@ -426,15 +418,15 @@
     public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();
-        final Set<File> replicatedIndexes = new HashSet<>();
+        final Set<FileReference> replicatedIndexes = new HashSet<>();
         final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
         for (LocalResource lr : partitionResources.values()) {
             DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
             if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-                replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
+                replicatedIndexes.add(ioManager.resolve(lr.getPath()));
             }
         }
-        for (File indexDir : replicatedIndexes) {
+        for (FileReference indexDir : replicatedIndexes) {
             partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
         }
         return partitionReplicatedFiles;
@@ -455,31 +447,23 @@
         return maxComponentId;
     }
 
-    private List<String> getIndexFiles(File indexDir) {
+    private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException {
         final List<String> indexFiles = new ArrayList<>();
-        if (indexDir.isDirectory()) {
-            File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-            if (indexFilteredFiles != null) {
-                Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
-            }
-        }
+        Collection<FileReference> indexFilteredFiles = ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+        indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
         return indexFiles;
     }
 
     private void createStorageRoots() {
-        for (Path root : storageRoots) {
-            try {
-                Files.createDirectories(root);
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to create storage root directory at " + root, e);
-            }
+        for (FileReference root : storageRoots) {
+            ioManager.makeDirectories(root);
         }
     }
 
     public synchronized void cleanup(int partition) throws HyracksDataException {
-        final Set<File> partitionIndexes = getPartitionIndexes(partition);
+        final Set<FileReference> partitionIndexes = getPartitionIndexes(partition);
         try {
-            for (File index : partitionIndexes) {
+            for (FileReference index : partitionIndexes) {
                 deleteIndexMaskedFiles(index);
                 if (isValidIndex(index)) {
                     deleteIndexInvalidComponents(index);
@@ -504,30 +488,27 @@
     }
 
     public synchronized void deleteCorruptedResources() throws HyracksDataException {
-        for (Path root : storageRoots) {
-            final Collection<File> metadataMaskFiles = IoUtil.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
-            for (File metadataMaskFile : metadataMaskFiles) {
-                final File resourceFile = new File(metadataMaskFile.getParent(), METADATA_FILE_NAME);
-                if (resourceFile.exists()) {
-                    IoUtil.delete(resourceFile);
-                }
-                IoUtil.delete(metadataMaskFile);
+        for (FileReference root : storageRoots) {
+            final Collection<FileReference> metadataMaskFiles =
+                    ioManager.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
+            for (FileReference metadataMaskFile : metadataMaskFiles) {
+                final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+                ioManager.delete(resourceFile);
+                ioManager.delete(metadataMaskFile);
             }
         }
     }
 
-    private void deleteIndexMaskedFiles(File index) throws IOException {
-        File[] masks = index.listFiles(MASK_FILES_FILTER);
-        if (masks != null) {
-            for (File mask : masks) {
-                deleteIndexMaskedFiles(index, mask);
-                // delete the mask itself
-                Files.delete(mask.toPath());
-            }
+    private void deleteIndexMaskedFiles(FileReference index) throws IOException {
+        Collection<FileReference> masks = ioManager.getMatchingFiles(index, MASK_FILES_FILTER);
+        for (FileReference mask : masks) {
+            deleteIndexMaskedFiles(index, mask);
+            // delete the mask itself
+            ioManager.delete(mask);
         }
     }
 
-    private boolean isValidIndex(File index) throws IOException {
+    private boolean isValidIndex(FileReference index) throws IOException {
         // any index without any checkpoint files is invalid
         // this can happen if a crash happens when the index metadata file is created
         // but before the initial checkpoint is persisted. The index metadata file will
@@ -535,46 +516,46 @@
         return getIndexCheckpointManager(index).getCheckpointCount() != 0;
     }
 
-    private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
-        final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
+    private void deleteIndexInvalidComponents(FileReference index) throws IOException, ParseException {
+        final Collection<FileReference> indexComponentFiles = ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error occurred");
         }
         final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence();
-        for (File componentFile : indexComponentFiles) {
+        for (FileReference componentFileRef : indexComponentFiles) {
             // delete any file with start or end sequence > valid component sequence
-            final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
-            final long fileEnd = IndexComponentFileReference.of(componentFile.getName()).getSequenceEnd();
+            final long fileStart = IndexComponentFileReference.of(componentFileRef.getName()).getSequenceStart();
+            final long fileEnd = IndexComponentFileReference.of(componentFileRef.getName()).getSequenceEnd();
             if (fileStart > validComponentSequence || fileEnd > validComponentSequence) {
-                LOGGER.warn(() -> "Deleting invalid component file " + componentFile.getAbsolutePath()
+                LOGGER.warn(() -> "Deleting invalid component file " + componentFileRef.getAbsolutePath()
                         + " based on valid sequence " + validComponentSequence);
-                Files.delete(componentFile.toPath());
+                ioManager.delete(componentFileRef);
             }
         }
     }
 
-    private IIndexCheckpointManager getIndexCheckpointManager(File index) throws HyracksDataException {
-        final String indexFile = Paths.get(index.getAbsolutePath(), StorageConstants.METADATA_FILE_NAME).toString();
+    private IIndexCheckpointManager getIndexCheckpointManager(FileReference index) throws HyracksDataException {
+        final String indexFile = index.getChild(METADATA_FILE_NAME).getAbsolutePath();
         final ResourceReference indexRef = ResourceReference.of(indexFile);
         return indexCheckpointManagerProvider.get(indexRef);
     }
 
-    private void deleteIndexMaskedFiles(File index, File mask) throws IOException {
-        if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+    private void deleteIndexMaskedFiles(FileReference index, FileReference mask) throws IOException {
+        if (!mask.getFile().getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
             throw new IllegalArgumentException("Unrecognized mask file: " + mask);
         }
-        File[] maskedFiles;
+        Collection<FileReference> maskedFiles;
         if (isComponentMask(mask)) {
             final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
-            maskedFiles = index.listFiles((dir, name) -> name.startsWith(componentId));
+            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.startsWith(componentId));
         } else {
             final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
-            maskedFiles = index.listFiles((dir, name) -> name.equals(maskedFileName));
+            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.equals(maskedFileName));
         }
         if (maskedFiles != null) {
-            for (File maskedFile : maskedFiles) {
+            for (FileReference maskedFile : maskedFiles) {
                 LOGGER.info(() -> "deleting masked file: " + maskedFile.getAbsolutePath());
-                Files.delete(maskedFile.toPath());
+                ioManager.delete(maskedFile);
             }
         }
     }
@@ -583,13 +564,13 @@
         try {
             final FileReference resolvedPath = ioManager.resolve(resource.getRelativePath().toString());
             long totalSize = 0;
-            final File[] indexFiles = resolvedPath.getFile().listFiles();
+            final Collection<FileReference> indexFiles = ioManager.list(resolvedPath);
             final Map<String, Long> componentsStats = new HashMap<>();
             if (indexFiles != null) {
-                for (File file : indexFiles) {
-                    long fileSize = file.length();
+                for (FileReference file : indexFiles) {
+                    long fileSize = ioManager.getSize(file);
                     totalSize += fileSize;
-                    if (isComponentFile(resolvedPath.getFile(), file.getName())) {
+                    if (isComponentFile(resolvedPath, file.getName())) {
                         String componentSeq = getComponentSequence(file.getAbsolutePath());
                         componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
                     }
@@ -621,73 +602,69 @@
     }
 
     private void createResourceFileMask(FileReference resourceFile) throws HyracksDataException {
-        Path maskFile = getResourceMaskFilePath(resourceFile);
+        FileReference maskFile = getResourceMaskFilePath(resourceFile);
         try {
-            Files.createFile(maskFile);
+            ioManager.create(maskFile);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
     private void deleteResourceFileMask(FileReference resourceFile) throws HyracksDataException {
-        Path maskFile = getResourceMaskFilePath(resourceFile);
-        IoUtil.delete(maskFile);
+        FileReference maskFile = getResourceMaskFilePath(resourceFile);
+        ioManager.delete(maskFile);
     }
 
-    private Path getResourceMaskFilePath(FileReference resourceFile) {
-        return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
+    private FileReference getResourceMaskFilePath(FileReference resourceFile) {
+        FileReference resourceFileParent = resourceFile.getParent();
+        return resourceFileParent.getChild(METADATA_FILE_MASK_NAME);
     }
 
-    private static boolean isComponentMask(File mask) {
+    private static boolean isComponentMask(FileReference mask) {
         return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
     }
 
-    private static boolean isComponentFile(File indexDir, String fileName) {
-        return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
+    private static boolean isComponentFile(FileReference indexDir, String fileName) {
+        return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
     }
 
-    public List<Path> getStorageRoots() {
-        return storageRoots;
-    }
-
-    public synchronized void keepPartitions(Set<Integer> keepPartitions) {
-        List<File> onDiskPartitions = getOnDiskPartitions();
-        for (File onDiskPartition : onDiskPartitions) {
+    public synchronized void keepPartitions(Set<Integer> keepPartitions) throws HyracksDataException {
+        List<FileReference> onDiskPartitions = getOnDiskPartitions();
+        for (FileReference onDiskPartition : onDiskPartitions) {
             int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (!keepPartitions.contains(partitionNum)) {
                 LOGGER.warn("deleting partition {} since it is not on partitions to keep {}", partitionNum,
                         keepPartitions);
-                FileUtils.deleteQuietly(onDiskPartition);
+                ioManager.delete(onDiskPartition);
             }
         }
     }
 
-    public synchronized List<File> getOnDiskPartitions() {
-        List<File> onDiskPartitions = new ArrayList<>();
-        for (Path root : storageRoots) {
-            File[] partitions = root.toFile().listFiles(
+    public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
+        List<FileReference> onDiskPartitions = new ArrayList<>();
+        for (FileReference root : storageRoots) {
+            Collection<FileReference> partitions = ioManager.list(root,
                     (dir, name) -> dir.isDirectory() && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
             if (partitions != null) {
-                onDiskPartitions.addAll(Arrays.asList(partitions));
+                onDiskPartitions.addAll(partitions);
             }
         }
         return onDiskPartitions;
     }
 
-    public Path getPartitionRoot(int partition) throws HyracksDataException {
-        Path path =
-                Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partition);
-        FileReference resolve = ioManager.resolve(path.toString());
-        return resolve.getFile().toPath();
+    public FileReference getPartitionRoot(int partition) throws HyracksDataException {
+        String path = StorageConstants.STORAGE_ROOT_DIR_NAME + File.separator + StorageConstants.PARTITION_DIR_PREFIX
+                + partition;
+        return ioManager.resolve(path);
     }
 
-    public void deletePartition(int partitionId) {
-        List<File> onDiskPartitions = getOnDiskPartitions();
-        for (File onDiskPartition : onDiskPartitions) {
+    public void deletePartition(int partitionId) throws HyracksDataException {
+        Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+        for (FileReference onDiskPartition : onDiskPartitions) {
             int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (partitionNum == partitionId) {
                 LOGGER.warn("deleting partition {}", partitionNum);
-                FileUtils.deleteQuietly(onDiskPartition);
+                ioManager.delete(onDiskPartition);
                 return;
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index cec1598..c826ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -97,6 +97,10 @@
         return path + File.separator + name;
     }
 
+    public String getName() {
+        return file.getName();
+    }
+
     public void register() {
         if (registrationTime != 0) {
             throw new IllegalStateException(
@@ -125,7 +129,7 @@
         if (parentIndex < 0) {
             return new FileReference(dev, "");
         }
-        String parentPath = path.substring(parentIndex);
+        String parentPath = path.substring(0, parentIndex);
         return new FileReference(dev, parentPath);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 5f93c21..07e1c78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -67,6 +67,8 @@
 
     long getSize(IFileHandle fileHandle);
 
+    long getSize(FileReference fileReference);
+
     WritableByteChannel newWritableChannel(IFileHandle fileHandle);
 
     void deleteWorkspaceFiles() throws HyracksDataException;
@@ -134,11 +136,16 @@
 
     void deleteDirectory(FileReference root) throws HyracksDataException;
 
+    // TODO: Remove and use list
     Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
 
     boolean exists(FileReference fileRef);
 
     void create(FileReference fileRef) throws HyracksDataException;
 
+    boolean makeDirectories(FileReference resourceDir);
+
+    void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+
     void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 6c74838..8b3fdec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -92,6 +92,14 @@
         return raf.getChannel();
     }
 
+    public void setLength(long newLength) throws HyracksDataException {
+        try {
+            raf.setLength(newLength);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     public synchronized void ensureOpen() throws HyracksDataException {
         if (raf == null || !raf.getChannel().isOpen()) {
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 4eebf6b..24d1061 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -400,7 +400,12 @@
 
     @Override
     public long getSize(IFileHandle fileHandle) {
-        return fileHandle.getFileReference().getFile().length();
+        return getSize(fileHandle.getFileReference());
+    }
+
+    @Override
+    public long getSize(FileReference fileReference) {
+        return fileReference.getFile().length();
     }
 
     @Override
@@ -577,7 +582,7 @@
     public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
             throws HyracksDataException {
         File rootFile = root.getFile();
-        if (!rootFile.exists()) {
+        if (!rootFile.exists() || !rootFile.isDirectory()) {
             return Collections.emptyList();
         }
 
@@ -601,6 +606,20 @@
     }
 
     @Override
+    public boolean makeDirectories(FileReference resourceDir) {
+        return resourceDir.getFile().mkdirs();
+    }
+
+    @Override
+    public void cleanDirectory(FileReference resourceDir) throws HyracksDataException {
+        try {
+            FileUtils.cleanDirectory(resourceDir.getFile());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
     public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
         try {
             FileUtils.copyDirectory(srcFileRef.getFile(), destFileRef.getFile());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 91316dc..08eba0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -61,15 +61,17 @@
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
         String baseName = getNextComponentSequence(BTREE_FILTER);
-        return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+        return new LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER + BTREE_SUFFIX),
+                null,
+                hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
     public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
         final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
-        return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+        return new LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER + BTREE_SUFFIX),
+                null,
+                hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 4da207d..610232f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -25,6 +25,7 @@
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -32,7 +33,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
@@ -139,9 +139,9 @@
     protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
             TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
             IBufferCache bufferCache) throws HyracksDataException {
-        String[] files = listDirFiles(baseDir, filter);
-        for (String fileName : files) {
-            FileReference fileRef = getFileReference(fileName);
+        Set<FileReference> files = ioManager.list(baseDir, filter);
+        for (FileReference filePath : files) {
+            FileReference fileRef = getCompressedFileReferenceIfAny(filePath.getName());
             if (treeFactory == null) {
                 allFiles.add(IndexComponentFileReference.of(fileRef));
                 continue;
@@ -155,24 +155,6 @@
         }
     }
 
-    static String[] listDirFiles(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        /*
-         * Returns null if this abstract pathname does not denote a directory, or if an I/O error occurs.
-         */
-        String[] files = dir.getFile().list(filter);
-        if (files == null) {
-            if (!dir.getFile().canRead()) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
-            } else if (!dir.getFile().exists()) {
-                throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, dir);
-            } else if (!dir.getFile().isDirectory()) {
-                throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
-            }
-            throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
-        }
-        return files;
-    }
-
     protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles,
             FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache)
             throws HyracksDataException {
@@ -189,15 +171,15 @@
 
     @Override
     public void createDirs() throws HyracksDataException {
-        if (baseDir.getFile().exists()) {
+        if (ioManager.exists(baseDir)) {
             throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_EXISTING_INDEX);
         }
-        baseDir.getFile().mkdirs();
+        ioManager.makeDirectories(baseDir);
     }
 
     @Override
     public void deleteDirs() throws HyracksDataException {
-        IoUtil.delete(baseDir);
+        ioManager.deleteDirectory(baseDir);
     }
 
     @Override
@@ -314,7 +296,7 @@
         return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
     }
 
-    protected FileReference getFileReference(String name) {
+    protected FileReference getCompressedFileReferenceIfAny(String name) {
         final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
         //Avoid creating LAF file for NoOpCompressorDecompressor
         if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
@@ -331,9 +313,9 @@
 
     private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
         long maxComponentSeq = -1;
-        final String[] files = listDirFiles(baseDir, filenameFilter);
-        for (String fileName : files) {
-            maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
+        final Set<FileReference> files = ioManager.list(baseDir, filenameFilter);
+        for (FileReference file : files) {
+            maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(file).getSequenceEnd());
         }
         return maxComponentSeq;
     }