[ASTERIXDB-3184][STO] Unify I/O APIs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Unify file operations by using IIOManager
Change-Id: Ifb77c24ee855537bcf725b2eb1e28b1af200f3ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17524
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 290734f..2ed1638 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -18,15 +18,11 @@
*/
package org.apache.asterix.app.nc;
-import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -34,9 +30,11 @@
import org.apache.asterix.common.storage.IndexCheckpoint;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -49,10 +47,12 @@
private static final FilenameFilter CHECKPOINT_FILE_FILTER =
(file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
private static final long BULKLOAD_LSN = 0;
- private final Path indexPath;
+ private final FileReference indexPath;
+ private final IIOManager ioManager;
- public IndexCheckpointManager(Path indexPath) {
+ public IndexCheckpointManager(FileReference indexPath, IIOManager ioManager) {
this.indexPath = indexPath;
+ this.ioManager = ioManager;
}
@Override
@@ -154,7 +154,7 @@
}
if (checkpoints.isEmpty()) {
LOGGER.warn("Couldn't find any checkpoint file for index {}. Content of dir are {}.", indexPath,
- Arrays.toString(indexPath.toFile().listFiles()));
+ ioManager.getMatchingFiles(indexPath, IoUtil.NO_OP_FILTER).toString());
throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
}
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -180,13 +180,13 @@
}
}
- private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException {
+ private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException, HyracksDataException {
List<IndexCheckpoint> checkpoints = new ArrayList<>();
- final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
- if (checkpointFiles != null) {
- for (File checkpointFile : checkpointFiles) {
+ final Collection<FileReference> checkpointFiles = ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ if (!checkpointFiles.isEmpty()) {
+ for (FileReference checkpointFile : checkpointFiles) {
try {
- checkpoints.add(read(checkpointFile.toPath()));
+ checkpoints.add(read(checkpointFile));
} catch (ClosedByInterruptException e) {
throw e;
} catch (IOException e) {
@@ -198,14 +198,14 @@
}
private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
- final Path checkpointPath = getCheckpointPath(checkpoint);
+ final FileReference checkpointPath = getCheckpointPath(checkpoint);
for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
try {
// clean up from previous write failure
- if (checkpointPath.toFile().exists()) {
- Files.delete(checkpointPath);
+ if (ioManager.exists(checkpointPath)) {
+ ioManager.delete(checkpointPath);
}
- FileUtil.writeAndForce(checkpointPath, checkpoint.asJson().getBytes());
+ ioManager.overwrite(checkpointPath, checkpoint.asJson().getBytes());
// ensure it was written correctly by reading it
read(checkpointPath);
return;
@@ -223,17 +223,18 @@
}
}
- private IndexCheckpoint read(Path checkpointPath) throws IOException {
- return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
+ private IndexCheckpoint read(FileReference checkpointPath) throws IOException {
+ return IndexCheckpoint.fromJson(new String(ioManager.readAllBytes(checkpointPath)));
}
private void deleteHistory(long latestId, int historyToKeep) {
try {
- final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
- if (checkpointFiles != null) {
- for (File checkpointFile : checkpointFiles) {
- if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
- Files.delete(checkpointFile.toPath());
+ final Collection<FileReference> checkpointFiles =
+ ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ if (!checkpointFiles.isEmpty()) {
+ for (FileReference checkpointFile : checkpointFiles) {
+ if (getCheckpointIdFromFileName(checkpointFile) < (latestId - historyToKeep)) {
+ ioManager.delete(checkpointFile);
}
}
}
@@ -242,13 +243,12 @@
}
}
- private Path getCheckpointPath(IndexCheckpoint checkpoint) {
- return Paths.get(indexPath.toString(),
- StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
+ private FileReference getCheckpointPath(IndexCheckpoint checkpoint) {
+ return indexPath.getChild(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + checkpoint.getId());
}
- private long getCheckpointIdFromFileName(Path checkpointPath) {
- return Long.valueOf(checkpointPath.getFileName().toString()
- .substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+ private long getCheckpointIdFromFileName(FileReference checkpointPath) {
+ return Long
+ .parseLong(checkpointPath.getName().substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
index e0b3105..1e08ed8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.nc;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
@@ -27,6 +26,7 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
@@ -54,8 +54,8 @@
private IndexCheckpointManager create(ResourceReference ref) {
try {
- final Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
- return new IndexCheckpointManager(indexPath);
+ final FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+ return new IndexCheckpointManager(indexPath, ioManager);
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index 6f4ce69..97d8d80 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -18,7 +18,8 @@
*/
package org.apache.asterix.cloud;
-import static org.apache.asterix.common.utils.StorageConstants.*;
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
import java.io.File;
import java.io.FilenameFilter;
@@ -151,6 +152,7 @@
super.close(fHandle);
}
+ // TODO This method should not do any syncing. It simply should list the files
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
@@ -223,6 +225,14 @@
}
@Override
+ public long getSize(FileReference fileReference) {
+ if (!fileReference.getFile().exists()) {
+ return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+ }
+ return super.getSize(fileReference);
+ }
+
+ @Override
public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
super.overwrite(fileRef, bytes);
// Write here will overwrite the older object if exists
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..999636d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -21,8 +21,6 @@
import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
@@ -77,7 +75,7 @@
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
- private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
+ private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
@@ -307,10 +305,8 @@
private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
FileReference target = operation.getTarget();
- final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
- Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
- Path maskFileRelPath =
- Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
- return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
+ String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
+ FileReference idxRelPath = target.getParent();
+ return idxRelPath.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index a9ed066..9702b18 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,7 +19,6 @@
package org.apache.asterix.common.utils;
import java.io.File;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
@@ -31,6 +30,7 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.MappedFileSplit;
@@ -180,7 +180,7 @@
* @return
* @throws HyracksDataException
*/
- public static Path getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
- return ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+ public static FileReference getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
+ return ioManager.resolve(ref.getRelativePath().toString());
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 97b6556..a6ba0e2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -24,7 +24,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Path;
import java.util.Collection;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -36,6 +35,7 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
@@ -69,14 +69,15 @@
DatasetResourceReference ref = DatasetResourceReference.of(ls);
final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
// Get most recent sequence of existing files to avoid deletion
- Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
- String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+ Collection<FileReference> files =
+ ioManager.getMatchingFiles(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
}
long maxComponentSequence = UNINITIALIZED_COMPONENT_SEQ;
- for (String file : files) {
+ for (FileReference file : files) {
maxComponentSequence =
Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 3f04bd2..e9af85c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -22,9 +22,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -50,21 +47,22 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
+ IIOManager ioManager = appCtx.getIoManager();
// create mask
- final Path maskPath = getComponentMaskPath(appCtx, file);
- Files.createFile(maskPath);
+ final FileReference maskPath = getComponentMaskPath(ioManager, file);
+ ioManager.create(maskPath);
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
} catch (IOException e) {
throw new ReplicationException(e);
}
}
- public static Path getComponentMaskPath(INcApplicationContext appCtx, String componentFile) throws IOException {
- final IIOManager ioManager = appCtx.getIoManager();
+ public static FileReference getComponentMaskPath(IIOManager ioManager, String componentFile) throws IOException {
final FileReference localPath = ioManager.resolve(componentFile);
- final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+ final FileReference resourceDir = localPath.getParent();
+ ioManager.makeDirectories(resourceDir);
final String componentSequence = ResourceReference.getComponentSequence(componentFile);
- return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+ return resourceDir.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}
@Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 92e4989..a00acfb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -20,10 +20,8 @@
import java.io.DataInput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +30,7 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,9 +51,9 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
final IIOManager ioManager = appCtx.getIoManager();
- final File localFile = ioManager.resolve(file).getFile();
- if (localFile.exists()) {
- Files.delete(localFile.toPath());
+ final FileReference localFile = ioManager.resolve(file);
+ if (ioManager.exists(localFile)) {
+ ioManager.delete(localFile);
ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath());
if (replicaRes.isMetadataResource()) {
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 483561b..2b0e2d8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -20,7 +20,6 @@
import java.io.DataInput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,8 +30,8 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,10 +51,10 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
final IIOManager ioManager = appCtx.getIoManager();
- final File indexFile = ioManager.resolve(file).getFile();
- if (indexFile.exists()) {
- File indexDir = indexFile.getParentFile();
- IoUtil.delete(indexDir);
+ final FileReference indexFile = ioManager.resolve(file);
+ if (ioManager.exists(indexFile)) {
+ FileReference indexDir = indexFile.getParent();
+ ioManager.deleteDirectory(indexDir);
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index fa77378..76fde09 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -22,8 +22,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -36,6 +34,8 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.LogManager;
@@ -67,9 +67,10 @@
} else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
+ IIOManager ioManager = appCtx.getIoManager();
// delete mask
- final Path maskPath = ComponentMaskTask.getComponentMaskPath(appCtx, file);
- Files.delete(maskPath);
+ final FileReference maskPath = ComponentMaskTask.getComponentMaskPath(ioManager, file);
+ ioManager.delete(maskPath);
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
} catch (IOException | InterruptedException e) {
throw new ReplicationException(e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 500a5de..71ed63e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -24,11 +24,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -40,10 +36,10 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -73,31 +69,34 @@
final IIOManager ioManager = appCtx.getIoManager();
// resolve path
final FileReference localPath = ioManager.resolve(file);
- final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+ FileReference resourceDir = localPath.getParent();
+ ioManager.makeDirectories(resourceDir);
if (indexMetadata) {
// ensure clean index directory
- FileUtils.cleanDirectory(resourceDir.toFile());
+ ioManager.cleanDirectory(resourceDir);
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
}
// create mask
- final Path maskPath = Paths.get(resourceDir.toString(),
- StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
- Files.createFile(maskPath);
+ final FileReference maskPath =
+ resourceDir.getChild(StorageConstants.MASK_FILE_PREFIX + localPath.getName());
+ ioManager.create(maskPath);
// receive actual file
- final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
- Files.createFile(filePath);
- try (RandomAccessFile fileOutputStream = new RandomAccessFile(filePath.toFile(), "rw");
- FileChannel fileChannel = fileOutputStream.getChannel()) {
- fileOutputStream.setLength(size);
+ ioManager.create(localPath);
+ FileHandle fileHandle = (FileHandle) ioManager.open(localPath, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try (FileChannel fileChannel = fileHandle.getFileChannel()) {
+ fileHandle.setLength(size);
NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
- fileChannel.force(true);
+ ioManager.sync(fileHandle, true);
+ } finally {
+ ioManager.close(fileHandle);
}
if (indexMetadata) {
initIndexCheckpoint(appCtx);
}
//delete mask
- Files.delete(maskPath);
+ ioManager.delete(maskPath);
LOGGER.debug("received file {} from master", localPath);
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
} catch (IOException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1e81346..8ef55e8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -21,6 +21,7 @@
import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
import static org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
@@ -28,12 +29,8 @@
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.text.ParseException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -44,7 +41,6 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -59,7 +55,6 @@
import org.apache.asterix.common.storage.ResourceStorageStats;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -68,14 +63,12 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -88,18 +81,20 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final String METADATA_FILE_MASK_NAME =
StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME;
private static final FilenameFilter LSM_INDEX_FILES_FILTER =
(dir, name) -> name.startsWith(METADATA_FILE_NAME) || !name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
private static final FilenameFilter MASK_FILES_FILTER =
(dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
- private static final int MAX_CACHED_RESOURCES = 1000;
private static final FilenameFilter METADATA_FILES_FILTER =
(dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
private static final FilenameFilter METADATA_MASK_FILES_FILTER =
(dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
+ private static final int MAX_CACHED_RESOURCES = 1000;
+
// Finals
private final IIOManager ioManager;
private final Cache<String, LocalResource> resourceCache;
@@ -107,7 +102,7 @@
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
- private final List<Path> storageRoots;
+ private final List<FileReference> storageRoots;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -120,8 +115,7 @@
storageRoots = new ArrayList<>();
final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
for (int i = 0; i < ioDevices.size(); i++) {
- storageRoots.add(
- Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME));
+ storageRoots.add(new FileReference(ioDevices.get(i), STORAGE_ROOT_DIR_NAME));
}
createStorageRoots();
resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -131,7 +125,7 @@
public String toString() {
StringBuilder aString = new StringBuilder().append(PersistentLocalResourceRepository.class.getSimpleName())
.append(Character.LINE_SEPARATOR).append(ioManager.getClass().getSimpleName()).append(':')
- .append(Character.LINE_SEPARATOR).append(ioManager.toString()).append(Character.LINE_SEPARATOR)
+ .append(Character.LINE_SEPARATOR).append(ioManager).append(Character.LINE_SEPARATOR)
.append("Cached Resources:").append(Character.LINE_SEPARATOR);
resourceCache.asMap().forEach(
(key, value) -> aString.append(key).append("->").append(value).append(Character.LINE_SEPARATOR));
@@ -143,8 +137,8 @@
LocalResource resource = resourceCache.getIfPresent(relativePath);
if (resource == null) {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- if (resourceFile.getFile().exists()) {
- resource = readLocalResource(resourceFile.getFile());
+ resource = readLocalResource(resourceFile);
+ if (resource != null) {
resourceCache.put(relativePath, resource);
}
}
@@ -162,15 +156,15 @@
throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
}
- final File parent = resourceFile.getFile().getParentFile();
- if (!parent.exists() && !parent.mkdirs()) {
+ final FileReference parent = resourceFile.getParent();
+ if (!ioManager.exists(parent) && !ioManager.makeDirectories(parent)) {
throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
}
// The next block should be all or nothing
try {
createResourceFileMask(resourceFile);
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
- FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+ ioManager.overwrite(resourceFile, bytes);
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
UNINITIALIZED_COMPONENT_SEQ, 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
deleteResourceFileMask(resourceFile);
@@ -199,7 +193,7 @@
private void cleanup(FileReference resourceFile) {
if (resourceFile.getFile().exists()) {
try {
- IoUtil.delete(resourceFile);
+ ioManager.delete(resourceFile);
} catch (Throwable th) {
LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
@@ -210,7 +204,9 @@
@Override
public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- boolean resourceExists = resourceFile.getFile().exists();
+ final LocalResource localResource = readLocalResource(resourceFile);
+
+ boolean resourceExists = localResource != null;
if (isReplicationEnabled && resourceExists) {
try {
createReplicationJob(ReplicationOperation.DELETE, resourceFile);
@@ -221,8 +217,7 @@
synchronized (this) {
try {
if (resourceExists) {
- final LocalResource localResource = readLocalResource(resourceFile.getFile());
- IoUtil.delete(resourceFile);
+ ioManager.delete(resourceFile);
// delete all checkpoints
indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
} else {
@@ -241,18 +236,15 @@
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<Path> roots)
- throws HyracksDataException {
+ public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter,
+ List<FileReference> roots) throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (Path root : roots) {
- if (!Files.exists(root) || !Files.isDirectory(root)) {
- continue;
- }
- final Collection<File> files = IoUtil.getMatchingFiles(root, METADATA_FILES_FILTER);
+ for (FileReference root : roots) {
+ final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
try {
- for (File file : files) {
+ for (FileReference file : files) {
final LocalResource localResource = readLocalResource(file);
- if (filter.test(localResource)) {
+ if (localResource != null && filter.test(localResource)) {
resourcesMap.put(localResource.getId(), localResource);
}
}
@@ -270,7 +262,7 @@
public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
throws HyracksDataException {
- List<Path> partitionsRoots = new ArrayList<>();
+ List<FileReference> partitionsRoots = new ArrayList<>();
for (Integer partition : partitions) {
partitionsRoots.add(getPartitionRoot(partition));
}
@@ -278,14 +270,15 @@
}
public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
- for (Path root : storageRoots) {
- final Collection<File> files = IoUtil.getMatchingFiles(root, METADATA_FILES_FILTER);
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
try {
- for (File file : files) {
+ for (FileReference file : files) {
final LocalResource localResource = readLocalResource(file);
- if (filter.test(localResource)) {
- LOGGER.warn("deleting invalid metadata index {}", file.getParentFile());
- IoUtil.delete(file.getParentFile());
+ if (localResource != null && filter.test(localResource)) {
+ FileReference parent = file.getParent();
+ LOGGER.warn("deleting invalid metadata index {}", parent);
+ ioManager.delete(parent);
}
}
} catch (IOException e) {
@@ -319,10 +312,14 @@
: (path + File.separator + StorageConstants.METADATA_FILE_NAME);
}
- private LocalResource readLocalResource(File file) throws HyracksDataException {
- final Path path = Paths.get(file.getAbsolutePath());
+ private LocalResource readLocalResource(FileReference fileRef) throws HyracksDataException {
+ byte[] bytes = ioManager.readAllBytes(fileRef);
+ if (bytes == null) {
+ return null;
+ }
+
try {
- final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(path), JsonNode.class);
+ final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class);
LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode);
if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
return resource;
@@ -358,15 +355,10 @@
/**
* Deletes physical files of all data verses.
- *
- * @throws IOException
*/
- public synchronized void deleteStorageData() throws IOException {
- for (Path root : storageRoots) {
- final File rootFile = root.toFile();
- if (rootFile.exists()) {
- FileUtils.deleteDirectory(rootFile);
- }
+ public synchronized void deleteStorageData() throws HyracksDataException {
+ for (FileReference root : storageRoots) {
+ ioManager.deleteDirectory(root);
}
createStorageRoots();
}
@@ -392,15 +384,15 @@
* @return The set of indexes files
* @throws HyracksDataException
*/
- public synchronized Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
- Path partitionRoot = getPartitionRoot(partition);
+ public synchronized Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException {
+ FileReference partitionRoot = getPartitionRoot(partition);
final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
return dsResource.getPartition() == partition;
}, Collections.singletonList(partitionRoot));
- Set<File> indexes = new HashSet<>();
+ Set<FileReference> indexes = new HashSet<>();
for (LocalResource localResource : partitionResourcesMap.values()) {
- indexes.add(ioManager.resolve(localResource.getPath()).getFile());
+ indexes.add(ioManager.resolve(localResource.getPath()));
}
return indexes;
}
@@ -426,15 +418,15 @@
public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
throws HyracksDataException {
final List<String> partitionReplicatedFiles = new ArrayList<>();
- final Set<File> replicatedIndexes = new HashSet<>();
+ final Set<FileReference> replicatedIndexes = new HashSet<>();
final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
for (LocalResource lr : partitionResources.values()) {
DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
+ replicatedIndexes.add(ioManager.resolve(lr.getPath()));
}
}
- for (File indexDir : replicatedIndexes) {
+ for (FileReference indexDir : replicatedIndexes) {
partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
}
return partitionReplicatedFiles;
@@ -455,31 +447,23 @@
return maxComponentId;
}
- private List<String> getIndexFiles(File indexDir) {
+ private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException {
final List<String> indexFiles = new ArrayList<>();
- if (indexDir.isDirectory()) {
- File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
- if (indexFilteredFiles != null) {
- Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
- }
- }
+ Collection<FileReference> indexFilteredFiles = ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+ indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
return indexFiles;
}
private void createStorageRoots() {
- for (Path root : storageRoots) {
- try {
- Files.createDirectories(root);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to create storage root directory at " + root, e);
- }
+ for (FileReference root : storageRoots) {
+ ioManager.makeDirectories(root);
}
}
public synchronized void cleanup(int partition) throws HyracksDataException {
- final Set<File> partitionIndexes = getPartitionIndexes(partition);
+ final Set<FileReference> partitionIndexes = getPartitionIndexes(partition);
try {
- for (File index : partitionIndexes) {
+ for (FileReference index : partitionIndexes) {
deleteIndexMaskedFiles(index);
if (isValidIndex(index)) {
deleteIndexInvalidComponents(index);
@@ -504,30 +488,27 @@
}
public synchronized void deleteCorruptedResources() throws HyracksDataException {
- for (Path root : storageRoots) {
- final Collection<File> metadataMaskFiles = IoUtil.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
- for (File metadataMaskFile : metadataMaskFiles) {
- final File resourceFile = new File(metadataMaskFile.getParent(), METADATA_FILE_NAME);
- if (resourceFile.exists()) {
- IoUtil.delete(resourceFile);
- }
- IoUtil.delete(metadataMaskFile);
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> metadataMaskFiles =
+ ioManager.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
+ for (FileReference metadataMaskFile : metadataMaskFiles) {
+ final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+ ioManager.delete(resourceFile);
+ ioManager.delete(metadataMaskFile);
}
}
}
- private void deleteIndexMaskedFiles(File index) throws IOException {
- File[] masks = index.listFiles(MASK_FILES_FILTER);
- if (masks != null) {
- for (File mask : masks) {
- deleteIndexMaskedFiles(index, mask);
- // delete the mask itself
- Files.delete(mask.toPath());
- }
+ private void deleteIndexMaskedFiles(FileReference index) throws IOException {
+ Collection<FileReference> masks = ioManager.getMatchingFiles(index, MASK_FILES_FILTER);
+ for (FileReference mask : masks) {
+ deleteIndexMaskedFiles(index, mask);
+ // delete the mask itself
+ ioManager.delete(mask);
}
}
- private boolean isValidIndex(File index) throws IOException {
+ private boolean isValidIndex(FileReference index) throws IOException {
// any index without any checkpoint files is invalid
// this can happen if a crash happens when the index metadata file is created
// but before the initial checkpoint is persisted. The index metadata file will
@@ -535,46 +516,46 @@
return getIndexCheckpointManager(index).getCheckpointCount() != 0;
}
- private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
- final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
+ private void deleteIndexInvalidComponents(FileReference index) throws IOException, ParseException {
+ final Collection<FileReference> indexComponentFiles = ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error occurred");
}
final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence();
- for (File componentFile : indexComponentFiles) {
+ for (FileReference componentFileRef : indexComponentFiles) {
// delete any file with start or end sequence > valid component sequence
- final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
- final long fileEnd = IndexComponentFileReference.of(componentFile.getName()).getSequenceEnd();
+ final long fileStart = IndexComponentFileReference.of(componentFileRef.getName()).getSequenceStart();
+ final long fileEnd = IndexComponentFileReference.of(componentFileRef.getName()).getSequenceEnd();
if (fileStart > validComponentSequence || fileEnd > validComponentSequence) {
- LOGGER.warn(() -> "Deleting invalid component file " + componentFile.getAbsolutePath()
+ LOGGER.warn(() -> "Deleting invalid component file " + componentFileRef.getAbsolutePath()
+ " based on valid sequence " + validComponentSequence);
- Files.delete(componentFile.toPath());
+ ioManager.delete(componentFileRef);
}
}
}
- private IIndexCheckpointManager getIndexCheckpointManager(File index) throws HyracksDataException {
- final String indexFile = Paths.get(index.getAbsolutePath(), StorageConstants.METADATA_FILE_NAME).toString();
+ private IIndexCheckpointManager getIndexCheckpointManager(FileReference index) throws HyracksDataException {
+ final String indexFile = index.getChild(METADATA_FILE_NAME).getAbsolutePath();
final ResourceReference indexRef = ResourceReference.of(indexFile);
return indexCheckpointManagerProvider.get(indexRef);
}
- private void deleteIndexMaskedFiles(File index, File mask) throws IOException {
- if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+ private void deleteIndexMaskedFiles(FileReference index, FileReference mask) throws IOException {
+ if (!mask.getFile().getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
throw new IllegalArgumentException("Unrecognized mask file: " + mask);
}
- File[] maskedFiles;
+ Collection<FileReference> maskedFiles;
if (isComponentMask(mask)) {
final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
- maskedFiles = index.listFiles((dir, name) -> name.startsWith(componentId));
+ maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.startsWith(componentId));
} else {
final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
- maskedFiles = index.listFiles((dir, name) -> name.equals(maskedFileName));
+ maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.equals(maskedFileName));
}
if (maskedFiles != null) {
- for (File maskedFile : maskedFiles) {
+ for (FileReference maskedFile : maskedFiles) {
LOGGER.info(() -> "deleting masked file: " + maskedFile.getAbsolutePath());
- Files.delete(maskedFile.toPath());
+ ioManager.delete(maskedFile);
}
}
}
@@ -583,13 +564,13 @@
try {
final FileReference resolvedPath = ioManager.resolve(resource.getRelativePath().toString());
long totalSize = 0;
- final File[] indexFiles = resolvedPath.getFile().listFiles();
+ final Collection<FileReference> indexFiles = ioManager.list(resolvedPath);
final Map<String, Long> componentsStats = new HashMap<>();
if (indexFiles != null) {
- for (File file : indexFiles) {
- long fileSize = file.length();
+ for (FileReference file : indexFiles) {
+ long fileSize = ioManager.getSize(file);
totalSize += fileSize;
- if (isComponentFile(resolvedPath.getFile(), file.getName())) {
+ if (isComponentFile(resolvedPath, file.getName())) {
String componentSeq = getComponentSequence(file.getAbsolutePath());
componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
}
@@ -621,73 +602,69 @@
}
private void createResourceFileMask(FileReference resourceFile) throws HyracksDataException {
- Path maskFile = getResourceMaskFilePath(resourceFile);
+ FileReference maskFile = getResourceMaskFilePath(resourceFile);
try {
- Files.createFile(maskFile);
+ ioManager.create(maskFile);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
private void deleteResourceFileMask(FileReference resourceFile) throws HyracksDataException {
- Path maskFile = getResourceMaskFilePath(resourceFile);
- IoUtil.delete(maskFile);
+ FileReference maskFile = getResourceMaskFilePath(resourceFile);
+ ioManager.delete(maskFile);
}
- private Path getResourceMaskFilePath(FileReference resourceFile) {
- return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
+ private FileReference getResourceMaskFilePath(FileReference resourceFile) {
+ FileReference resourceFileParent = resourceFile.getParent();
+ return resourceFileParent.getChild(METADATA_FILE_MASK_NAME);
}
- private static boolean isComponentMask(File mask) {
+ private static boolean isComponentMask(FileReference mask) {
return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
}
- private static boolean isComponentFile(File indexDir, String fileName) {
- return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
+ private static boolean isComponentFile(FileReference indexDir, String fileName) {
+ return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
}
- public List<Path> getStorageRoots() {
- return storageRoots;
- }
-
- public synchronized void keepPartitions(Set<Integer> keepPartitions) {
- List<File> onDiskPartitions = getOnDiskPartitions();
- for (File onDiskPartition : onDiskPartitions) {
+ public synchronized void keepPartitions(Set<Integer> keepPartitions) throws HyracksDataException {
+ List<FileReference> onDiskPartitions = getOnDiskPartitions();
+ for (FileReference onDiskPartition : onDiskPartitions) {
int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (!keepPartitions.contains(partitionNum)) {
LOGGER.warn("deleting partition {} since it is not on partitions to keep {}", partitionNum,
keepPartitions);
- FileUtils.deleteQuietly(onDiskPartition);
+ ioManager.delete(onDiskPartition);
}
}
}
- public synchronized List<File> getOnDiskPartitions() {
- List<File> onDiskPartitions = new ArrayList<>();
- for (Path root : storageRoots) {
- File[] partitions = root.toFile().listFiles(
+ public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
+ List<FileReference> onDiskPartitions = new ArrayList<>();
+ for (FileReference root : storageRoots) {
+ Collection<FileReference> partitions = ioManager.list(root,
(dir, name) -> dir.isDirectory() && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
if (partitions != null) {
- onDiskPartitions.addAll(Arrays.asList(partitions));
+ onDiskPartitions.addAll(partitions);
}
}
return onDiskPartitions;
}
- public Path getPartitionRoot(int partition) throws HyracksDataException {
- Path path =
- Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partition);
- FileReference resolve = ioManager.resolve(path.toString());
- return resolve.getFile().toPath();
+ public FileReference getPartitionRoot(int partition) throws HyracksDataException {
+ String path = StorageConstants.STORAGE_ROOT_DIR_NAME + File.separator + StorageConstants.PARTITION_DIR_PREFIX
+ + partition;
+ return ioManager.resolve(path);
}
- public void deletePartition(int partitionId) {
- List<File> onDiskPartitions = getOnDiskPartitions();
- for (File onDiskPartition : onDiskPartitions) {
+ public void deletePartition(int partitionId) throws HyracksDataException {
+ Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+ for (FileReference onDiskPartition : onDiskPartitions) {
int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (partitionNum == partitionId) {
LOGGER.warn("deleting partition {}", partitionNum);
- FileUtils.deleteQuietly(onDiskPartition);
+ ioManager.delete(onDiskPartition);
return;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index cec1598..c826ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -97,6 +97,10 @@
return path + File.separator + name;
}
+ public String getName() {
+ return file.getName();
+ }
+
public void register() {
if (registrationTime != 0) {
throw new IllegalStateException(
@@ -125,7 +129,7 @@
if (parentIndex < 0) {
return new FileReference(dev, "");
}
- String parentPath = path.substring(parentIndex);
+ String parentPath = path.substring(0, parentIndex);
return new FileReference(dev, parentPath);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 5f93c21..07e1c78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -67,6 +67,8 @@
long getSize(IFileHandle fileHandle);
+ long getSize(FileReference fileReference);
+
WritableByteChannel newWritableChannel(IFileHandle fileHandle);
void deleteWorkspaceFiles() throws HyracksDataException;
@@ -134,11 +136,16 @@
void deleteDirectory(FileReference root) throws HyracksDataException;
+ // TODO: Remove and use list
Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
boolean exists(FileReference fileRef);
void create(FileReference fileRef) throws HyracksDataException;
+ boolean makeDirectories(FileReference resourceDir);
+
+ void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+
void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 6c74838..8b3fdec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -92,6 +92,14 @@
return raf.getChannel();
}
+ public void setLength(long newLength) throws HyracksDataException {
+ try {
+ raf.setLength(newLength);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
public synchronized void ensureOpen() throws HyracksDataException {
if (raf == null || !raf.getChannel().isOpen()) {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 4eebf6b..24d1061 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -400,7 +400,12 @@
@Override
public long getSize(IFileHandle fileHandle) {
- return fileHandle.getFileReference().getFile().length();
+ return getSize(fileHandle.getFileReference());
+ }
+
+ @Override
+ public long getSize(FileReference fileReference) {
+ return fileReference.getFile().length();
}
@Override
@@ -577,7 +582,7 @@
public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
throws HyracksDataException {
File rootFile = root.getFile();
- if (!rootFile.exists()) {
+ if (!rootFile.exists() || !rootFile.isDirectory()) {
return Collections.emptyList();
}
@@ -601,6 +606,20 @@
}
@Override
+ public boolean makeDirectories(FileReference resourceDir) {
+ return resourceDir.getFile().mkdirs();
+ }
+
+ @Override
+ public void cleanDirectory(FileReference resourceDir) throws HyracksDataException {
+ try {
+ FileUtils.cleanDirectory(resourceDir.getFile());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
try {
FileUtils.copyDirectory(srcFileRef.getFile(), destFileRef.getFile());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 91316dc..08eba0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -61,15 +61,17 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
String baseName = getNextComponentSequence(BTREE_FILTER);
- return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+ return new LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER + BTREE_SUFFIX),
+ null,
+ hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
- return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+ return new LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER + BTREE_SUFFIX),
+ null,
+ hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 4da207d..610232f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -25,6 +25,7 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -32,7 +33,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
@@ -139,9 +139,9 @@
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
IBufferCache bufferCache) throws HyracksDataException {
- String[] files = listDirFiles(baseDir, filter);
- for (String fileName : files) {
- FileReference fileRef = getFileReference(fileName);
+ Set<FileReference> files = ioManager.list(baseDir, filter);
+ for (FileReference filePath : files) {
+ FileReference fileRef = getCompressedFileReferenceIfAny(filePath.getName());
if (treeFactory == null) {
allFiles.add(IndexComponentFileReference.of(fileRef));
continue;
@@ -155,24 +155,6 @@
}
}
- static String[] listDirFiles(FileReference dir, FilenameFilter filter) throws HyracksDataException {
- /*
- * Returns null if this abstract pathname does not denote a directory, or if an I/O error occurs.
- */
- String[] files = dir.getFile().list(filter);
- if (files == null) {
- if (!dir.getFile().canRead()) {
- throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
- } else if (!dir.getFile().exists()) {
- throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, dir);
- } else if (!dir.getFile().isDirectory()) {
- throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
- }
- throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
- }
- return files;
- }
-
protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles,
FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache)
throws HyracksDataException {
@@ -189,15 +171,15 @@
@Override
public void createDirs() throws HyracksDataException {
- if (baseDir.getFile().exists()) {
+ if (ioManager.exists(baseDir)) {
throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_EXISTING_INDEX);
}
- baseDir.getFile().mkdirs();
+ ioManager.makeDirectories(baseDir);
}
@Override
public void deleteDirs() throws HyracksDataException {
- IoUtil.delete(baseDir);
+ ioManager.deleteDirectory(baseDir);
}
@Override
@@ -314,7 +296,7 @@
return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
}
- protected FileReference getFileReference(String name) {
+ protected FileReference getCompressedFileReferenceIfAny(String name) {
final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
//Avoid creating LAF file for NoOpCompressorDecompressor
if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
@@ -331,9 +313,9 @@
private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
long maxComponentSeq = -1;
- final String[] files = listDirFiles(baseDir, filenameFilter);
- for (String fileName : files) {
- maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
+ final Set<FileReference> files = ioManager.list(baseDir, filenameFilter);
+ for (FileReference file : files) {
+ maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(file).getSequenceEnd());
}
return maxComponentSeq;
}