[ASTERIXDB-3196][TX] Support atomic Txn with no WAL
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Prevent concurrent metadata catalog modification.
- Introduce atomic txn without WAL that will persist
all records on commit or delete on abort.
- Compute-storage separation fixes.
Change-Id: Icfd034a4dc0b6464564e2129a166e4ceb0dc7b41
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17583
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 2ed1638..372cf69 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -154,7 +154,7 @@
}
if (checkpoints.isEmpty()) {
LOGGER.warn("Couldn't find any checkpoint file for index {}. Content of dir are {}.", indexPath,
- ioManager.getMatchingFiles(indexPath, IoUtil.NO_OP_FILTER).toString());
+ ioManager.list(indexPath, IoUtil.NO_OP_FILTER).toString());
throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
}
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -182,7 +182,7 @@
private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException, HyracksDataException {
List<IndexCheckpoint> checkpoints = new ArrayList<>();
- final Collection<FileReference> checkpointFiles = ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
if (!checkpointFiles.isEmpty()) {
for (FileReference checkpointFile : checkpointFiles) {
try {
@@ -229,8 +229,7 @@
private void deleteHistory(long latestId, int historyToKeep) {
try {
- final Collection<FileReference> checkpointFiles =
- ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
if (!checkpointFiles.isEmpty()) {
for (FileReference checkpointFile : checkpointFiles) {
if (getCheckpointIdFromFileName(checkpointFile) < (latestId - historyToKeep)) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index cc69bb1..eb2b05b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,7 +27,7 @@
public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
public static void main(String[] args) throws Exception {
- // CloudUtils.startS3CloudEnvironment();
+ CloudUtils.startS3CloudEnvironment();
final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
try {
integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index c4390fa..38fdf56 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.dataflow;
import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -70,20 +69,10 @@
}
}
- static void setFinal(Field field, Object obj, Object newValue) throws Exception {
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
- field.set(obj, newValue);
- }
-
@SuppressWarnings({ "rawtypes", "unchecked" })
static void replaceMapEntry(Field field, Object obj, Object key, Object value)
throws Exception, IllegalAccessException {
field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
Map map = (Map) field.get(obj);
map.put(key, value);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index f87adba..799da68 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -185,7 +185,8 @@
// Add the remaining files that are not stored locally (if any)
for (String cloudFile : cloudFiles) {
- localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+ localFiles.add(new FileReference(dir.getDeviceHandle(),
+ cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
}
return new HashSet<>(localFiles);
}
@@ -244,12 +245,11 @@
return super.doSyncRead(fHandle, offset, data);
}
- // TODO: We need to download this too
@Override
public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
if (!fileRef.getFile().exists()) {
- // TODO(htowaileb): if it does not exist, download (lazy)
- // TODO(htowaileb): make sure downloading the file is synchronous since many can request it at the same time
+ IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE, FileSyncMode.METADATA_SYNC_DATA_SYNC);
+ fileRef = open.getFileReference();
}
return super.readAllBytes(fileRef);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..2704e64 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +38,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -297,6 +299,31 @@
return "Dataset (" + datasetID + "), Partition (" + partition + ")";
}
+ public void deleteMemoryComponent() throws HyracksDataException {
+ Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ ILSMIndex primaryLsmIndex = null;
+ for (ILSMIndex lsmIndex : indexes) {
+ if (lsmIndex.isPrimaryIndex()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ LOGGER.info("Primary index on dataset {} and partition {} is empty... skipping delete",
+ dsInfo.getDatasetID(), partition);
+ return;
+ }
+ primaryLsmIndex = lsmIndex;
+ break;
+ }
+ }
+ Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + indexes);
+ idGenerator.refresh();
+ ILSMComponentId nextComponentId = idGenerator.getId();
+ Map<String, Object> flushMap = new HashMap<>();
+ flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+ flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+ ILSMIndexAccessor accessor = primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.getOpContext().setParameters(flushMap);
+ accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+ }
+
private boolean canSafelyFlush() {
return numActiveOperations.get() == 0;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 940535f..34d7caa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.transactions;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -145,4 +147,16 @@
* so that any resources held by the transaction may be released
*/
void complete();
+
+ /**
+ * Acquires {@code lock} write lock and sets the transactions as a write transaction
+ * @param lock
+ */
+ void acquireExclusiveWriteLock(ReentrantLock lock);
+
+ /**
+ * Determines if this tx uses WAL
+ * @return true if this tx uses WAL. Otherwise, false.
+ */
+ boolean hasWAL();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 396d3f6..30c693d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -43,6 +43,10 @@
*/
ATOMIC,
/**
+ * all records are committed and persisted to disk or nothing
+ */
+ ATOMIC_NO_WAL,
+ /**
* any record with entity commit log
*/
ENTITY_LEVEL
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2195f88..522fd32 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -116,6 +117,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import org.apache.asterix.transaction.management.opcallbacks.NoOpModificationOpCallback;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -161,6 +163,8 @@
private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
// extension only
private Map<ExtensionMetadataDatasetId, ExtensionMetadataDataset<?>> extensionDatasets;
+ private final ReentrantLock metadataModificationLock = new ReentrantLock(true);
+ private boolean atomicNoWAL;
public static final MetadataNode INSTANCE = new MetadataNode();
@@ -184,6 +188,7 @@
}
}
this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
+ atomicNoWAL = runtimeContext.isCloudDeployment();
}
public int getMetadataStoragePartition() {
@@ -192,7 +197,8 @@
@Override
public void beginTransaction(TxnId transactionId) {
- TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC);
+ AtomicityLevel lvl = atomicNoWAL ? AtomicityLevel.ATOMIC_NO_WAL : AtomicityLevel.ATOMIC;
+ TransactionOptions options = new TransactionOptions(lvl);
transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options);
}
@@ -501,11 +507,7 @@
if (!force) {
confirmFullTextFilterCanBeDeleted(txnId, dataverseName, filterName);
}
-
try {
- FullTextFilterMetadataEntityTupleTranslator translator =
- tupleTranslatorProvider.getFullTextFilterTupleTranslator(true);
-
ITupleReference key = createTuple(dataverseName.getCanonicalForm(), filterName);
deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.FULL_TEXT_FILTER_DATASET, key);
} catch (HyracksDataException e) {
@@ -612,7 +614,7 @@
IModificationOperationCallback modCallback = createIndexModificationCallback(op, txnCtx, metadataIndex);
IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
- txnCtx.setWriteTxn(true);
+ txnCtx.acquireExclusiveWriteLock(metadataModificationLock);
txnCtx.register(metadataIndex.getResourceId(),
StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback,
metadataIndex.isPrimaryIndex());
@@ -640,6 +642,12 @@
switch (indexOp) {
case INSERT:
case DELETE:
+ if (!txnCtx.hasWAL()) {
+ return new NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+ metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+ transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
+ ResourceType.LSM_BTREE, indexOp);
+ }
/*
* Regardless of the index type (primary or secondary index), secondary index modification
* callback is given. This is still correct since metadata index operation doesn't require
@@ -650,6 +658,12 @@
transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
ResourceType.LSM_BTREE, indexOp);
case UPSERT:
+ if (!txnCtx.hasWAL()) {
+ return new NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+ metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+ transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
+ ResourceType.LSM_BTREE, indexOp);
+ }
return new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(),
txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem,
metadataIndex.getResourceId(), metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index a6ba0e2..11bac0d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -71,7 +71,7 @@
// Get most recent sequence of existing files to avoid deletion
FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
Collection<FileReference> files =
- ioManager.getMatchingFiles(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ ioManager.list(indexPath, AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
new file mode 100644
index 0000000..ad77939
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class NoOpModificationOpCallback extends AbstractIndexModificationOperationCallback {
+
+ public NoOpModificationOpCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+ ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
+ byte resourceType, Operation indexOp) {
+ super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+ resourceType, indexOp);
+ }
+
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ // no op
+ }
+
+ @Override
+ public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
+ // no op
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 25b9610..4e71b1c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -63,6 +63,7 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
@@ -240,7 +241,7 @@
List<FileReference> roots) throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
for (FileReference root : roots) {
- final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+ final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
try {
for (FileReference file : files) {
final LocalResource localResource = readLocalResource(file);
@@ -274,7 +275,7 @@
public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
for (FileReference root : storageRoots) {
- final Collection<FileReference> files = ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+ final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
try {
for (FileReference file : files) {
final LocalResource localResource = readLocalResource(file);
@@ -452,7 +453,7 @@
private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException {
final List<String> indexFiles = new ArrayList<>();
- Collection<FileReference> indexFilteredFiles = ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+ Collection<FileReference> indexFilteredFiles = ioManager.list(indexDir, LSM_INDEX_FILES_FILTER);
indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
return indexFiles;
}
@@ -492,8 +493,7 @@
public synchronized void deleteCorruptedResources() throws HyracksDataException {
for (FileReference root : storageRoots) {
- final Collection<FileReference> metadataMaskFiles =
- ioManager.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
+ final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER);
for (FileReference metadataMaskFile : metadataMaskFiles) {
final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
ioManager.delete(resourceFile);
@@ -503,7 +503,7 @@
}
private void deleteIndexMaskedFiles(FileReference index) throws IOException {
- Collection<FileReference> masks = ioManager.getMatchingFiles(index, MASK_FILES_FILTER);
+ Collection<FileReference> masks = ioManager.list(index, MASK_FILES_FILTER);
for (FileReference mask : masks) {
deleteIndexMaskedFiles(index, mask);
// delete the mask itself
@@ -520,7 +520,7 @@
}
private void deleteIndexInvalidComponents(FileReference index) throws IOException, ParseException {
- final Collection<FileReference> indexComponentFiles = ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
+ final Collection<FileReference> indexComponentFiles = ioManager.list(index, COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error occurred");
}
@@ -550,10 +550,10 @@
Collection<FileReference> maskedFiles;
if (isComponentMask(mask)) {
final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
- maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.startsWith(componentId));
+ maskedFiles = ioManager.list(index, (dir, name) -> name.startsWith(componentId));
} else {
final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
- maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> name.equals(maskedFileName));
+ maskedFiles = ioManager.list(index, (dir, name) -> name.equals(maskedFileName));
}
if (maskedFiles != null) {
for (FileReference maskedFile : maskedFiles) {
@@ -643,14 +643,11 @@
}
}
- public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
+ public synchronized List<FileReference> getOnDiskPartitions() {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
- Collection<FileReference> partitions = ioManager.list(root, (dir, name) -> dir != null && dir.isDirectory()
- && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
- if (partitions != null) {
- onDiskPartitions.addAll(partitions);
- }
+ onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory()
+ && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
}
return onDiskPartitions;
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 104f9a7..8a1856c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -23,8 +23,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
@@ -42,6 +44,7 @@
private final AtomicInteger txnState;
private final AtomicBoolean isWriteTxn;
private volatile boolean isTimeout;
+ private ReentrantLock exclusiveLock;
protected AbstractTransactionContext(TxnId txnId) {
this.txnId = txnId;
@@ -90,6 +93,21 @@
}
@Override
+ public void acquireExclusiveWriteLock(ReentrantLock exclusiveLock) {
+ if (isWriteTxn.get()) {
+ return;
+ }
+ try {
+ exclusiveLock.lockInterruptibly();
+ this.exclusiveLock = exclusiveLock;
+ setWriteTxn(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
public void setWriteTxn(boolean isWriteTxn) {
this.isWriteTxn.set(isWriteTxn);
}
@@ -114,6 +132,9 @@
synchronized (txnOpTrackers) {
txnOpTrackers.forEach((resource, opTracker) -> opTracker.afterTransaction(resource));
}
+ if (exclusiveLock != null) {
+ exclusiveLock.unlock();
+ }
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
new file mode 100644
index 0000000..cfa39c6
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
+
+ public AtomicNoWALTransactionContext(TxnId txnId) {
+ super(txnId);
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ final int txnState = getTxnState();
+ switch (txnState) {
+ case ITransactionManager.ABORTED:
+ deleteUncommittedRecords();
+ break;
+ case ITransactionManager.COMMITTED:
+ ensureDurable();
+ break;
+ default:
+ throw new IllegalStateException("invalid state in txn clean up: " + getTxnState());
+ }
+ }
+
+ private void deleteUncommittedRecords() {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+ try {
+ primaryIndexOpTracker.deleteMemoryComponent();
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+
+ private void ensureDurable() {
+ List<FlushOperation> flushes = new ArrayList<>();
+ LogRecord dummyLogRecord = new LogRecord();
+ try {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+ primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
+ flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+ }
+ LSMIndexUtil.waitFor(flushes);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public boolean hasWAL() {
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 083c26b..870a76b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,7 +18,10 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +41,7 @@
private final Map<Long, ILSMOperationTracker> opTrackers = new ConcurrentHashMap<>();
private final Map<Long, AtomicInteger> indexPendingOps = new ConcurrentHashMap<>();
private final Map<Long, IModificationOperationCallback> callbacks = new ConcurrentHashMap<>();
+ protected final Set<ILSMOperationTracker> modifiedIndexes = Collections.synchronizedSet(new HashSet<>());
public AtomicTransactionContext(TxnId txnId) {
super(txnId);
@@ -64,6 +68,7 @@
@Override
public void beforeOperation(long resourceId) {
indexPendingOps.get(resourceId).incrementAndGet();
+ modifiedIndexes.add(opTrackers.get(resourceId));
}
@Override
@@ -110,4 +115,9 @@
AtomicTransactionContext that = (AtomicTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public boolean hasWAL() {
+ return true;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index be5874e..2e5f6dd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -113,4 +113,9 @@
EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public boolean hasWAL() {
+ return true;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 4a465a4..1076086 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -34,6 +34,8 @@
switch (atomicityLevel) {
case ATOMIC:
return new AtomicTransactionContext(txnId);
+ case ATOMIC_NO_WAL:
+ return new AtomicNoWALTransactionContext(txnId);
case ENTITY_LEVEL:
return new EntityLevelTransactionContext(txnId);
default:
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ee65962..8bbfbfd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -81,9 +81,11 @@
final ITransactionContext txnCtx = getTransactionContext(txnId);
try {
if (txnCtx.isWriteTxn()) {
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true);
- txnSubsystem.getLogManager().log(logRecord);
+ if (txnCtx.hasWAL()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true);
+ txnSubsystem.getLogManager().log(logRecord);
+ }
txnCtx.setTxnState(ITransactionManager.COMMITTED);
}
} catch (Exception e) {
@@ -103,13 +105,15 @@
final ITransactionContext txnCtx = getTransactionContext(txnId);
try {
if (txnCtx.isWriteTxn()) {
- if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
- txnSubsystem.getLogManager().log(logRecord);
- txnSubsystem.getCheckpointManager().secure(txnId);
+ if (txnCtx.hasWAL()) {
+ if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
+ }
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
}
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
} catch (HyracksDataException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 63226ac..10f1637 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -23,7 +23,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -125,6 +124,13 @@
Set<FileReference> list(FileReference dir) throws HyracksDataException;
+ /**
+ * Lists the files matching {@code filter} recursively starting from {@code dir}
+ * @param dir
+ * @param filter
+ * @return the matching files
+ * @throws HyracksDataException
+ */
Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException;
void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException;
@@ -136,9 +142,6 @@
void deleteDirectory(FileReference root) throws HyracksDataException;
- // TODO: Remove and use list
- Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
-
boolean exists(FileReference fileRef) throws HyracksDataException;
void create(FileReference fileRef) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 7644a30..05e6544 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -29,8 +29,11 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -194,4 +197,18 @@
public static String getFileNameFromPath(String path) {
return path.substring(path.lastIndexOf('/') + 1);
}
+
+ public static Collection<FileReference> getMatchingChildren(FileReference root, FilenameFilter filter) {
+ if (!root.getFile().isDirectory()) {
+ throw new IllegalArgumentException("Parameter 'root' is not a directory: " + root);
+ }
+ Objects.requireNonNull(filter);
+ List<FileReference> files = new ArrayList<>();
+ String[] matchingFiles = root.getFile().list(filter);
+ if (matchingFiles != null) {
+ files.addAll(Arrays.stream(matchingFiles).map(pDir -> new FileReference(root.getDeviceHandle(), pDir))
+ .collect(Collectors.toList()));
+ }
+ return files;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 19fbff3..1733ad4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -517,27 +517,13 @@
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
- /*
- * Throws an error if this abstract pathname does not denote a directory, or if an I/O error occurs.
- * Returns an empty set if the file does not exist, otherwise, returns the files in the specified directory
- */
Set<FileReference> listedFiles = new HashSet<>();
if (!dir.getFile().exists()) {
return listedFiles;
}
-
- String[] files = dir.getFile().list(filter);
- if (files == null) {
- if (!dir.getFile().canRead()) {
- throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
- } else if (!dir.getFile().isDirectory()) {
- throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
- }
- throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
- }
-
- for (String file : files) {
- listedFiles.add(dir.getChild(file));
+ Collection<File> files = IoUtil.getMatchingFiles(dir.getFile().toPath(), filter);
+ for (File file : files) {
+ listedFiles.add(resolveAbsolutePath(file.getAbsolutePath()));
}
return listedFiles;
}
@@ -579,23 +565,6 @@
}
@Override
- public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
- throws HyracksDataException {
- File rootFile = root.getFile();
- if (!rootFile.exists() || !rootFile.isDirectory()) {
- return Collections.emptyList();
- }
-
- Collection<File> files = IoUtil.getMatchingFiles(rootFile.toPath(), filter);
- Set<FileReference> fileReferences = new HashSet<>();
- for (File file : files) {
- fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
- }
-
- return fileReferences;
- }
-
- @Override
public boolean exists(FileReference fileRef) throws HyracksDataException {
return fileRef.getFile().exists();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 45bfed1..1c0d7b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -23,7 +23,6 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.common.IIndex;
@@ -101,7 +100,7 @@
LOGGER.warn(
"Deleting {} on index create. The index is not registered but the file exists in the filesystem",
resolvedResourceRef);
- IoUtil.delete(resolvedResourceRef);
+ ctx.getIoManager().delete(resolvedResourceRef);
}
index = resource.createInstance(ctx);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 6a4f10d..1bceea3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -42,8 +42,8 @@
protected final BufferCache bufferCache;
protected final IPageReplacementStrategy pageReplacementStrategy;
+ protected final IOManager ioManager;
private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
- private final IOManager ioManager;
private IFileHandle fileHandle;
private volatile boolean hasOpen;
@@ -193,7 +193,7 @@
}
}
- public static void deleteFile(FileReference fileRef) throws HyracksDataException {
+ public static void deleteFile(FileReference fileRef, IIOManager ioManager) throws HyracksDataException {
HyracksDataException savedEx = null;
/*
@@ -206,7 +206,7 @@
final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
final FileReference lafFileRef = cFileRef.getLAFFileReference();
if (lafFileRef.getFile().exists()) {
- IoUtil.delete(lafFileRef);
+ ioManager.delete(lafFileRef);
}
}
} catch (HyracksDataException e) {
@@ -214,7 +214,7 @@
}
try {
- IoUtil.delete(fileRef);
+ ioManager.delete(fileRef);
} catch (HyracksDataException e) {
if (savedEx != null) {
savedEx.addSuppressed(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 70500e5..10284f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -43,7 +43,6 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
@@ -761,7 +760,7 @@
} catch (Exception e) {
// If file registration failed for any reason, we need to undo the file creation
try {
- IoUtil.delete(fileRef);
+ ioManager.delete(fileRef);
} catch (Exception deleteException) {
e.addSuppressed(deleteException);
}
@@ -960,7 +959,7 @@
if (mapped) {
deleteFile(fileId);
} else {
- BufferedFileHandle.deleteFile(fileRef);
+ BufferedFileHandle.deleteFile(fileRef, ioManager);
}
}
@@ -991,7 +990,7 @@
fInfo.markAsDeleted();
}
} finally {
- BufferedFileHandle.deleteFile(fileRef);
+ BufferedFileHandle.deleteFile(fileRef, ioManager);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
index 7d0cc62..d78ac24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
@@ -64,6 +65,7 @@
private final IBufferCache bufferCache;
private final ICompressorDecompressor compressorDecompressor;
private final CompressedFileReference fileRef;
+ private final IIOManager ioManager;
private int fileId;
private State state;
@@ -71,12 +73,13 @@
private LAFWriter lafWriter;
- public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef) {
+ public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef, IIOManager ioManager) {
state = State.CLOSED;
totalNumOfPages = 0;
this.bufferCache = bufferCache;
this.fileRef = fileRef;
this.compressorDecompressor = fileRef.getCompressorDecompressor();
+ this.ioManager = ioManager;
}
/* ************************
@@ -99,7 +102,7 @@
ensureState(CLOSED);
boolean open = false;
- if (fileRef.getLAFFileReference().getFile().exists()) {
+ if (ioManager.exists(fileRef.getLAFFileReference())) {
fileId = bufferCache.openFile(fileRef.getLAFFileReference());
open = true;
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index b6178c5..01811c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -157,7 +157,7 @@
@Override
public void open(FileReference fileRef) throws HyracksDataException {
final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
- compressedFileManager = new CompressedFileManager(bufferCache, cFileRef);
+ compressedFileManager = new CompressedFileManager(bufferCache, cFileRef, ioManager);
compressedFileManager.open();
super.open(fileRef);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
index ddb5717..de8bdc8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.storage.am.common;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.util.Log4j2Monitor;
import org.apache.logging.log4j.Level;
@@ -97,7 +96,6 @@
index.destroy();
Assert.assertFalse(persistentStateExists());
index.destroy();
- Assert.assertTrue(Log4j2Monitor.count(IoUtil.FILE_NOT_FOUND_MSG) > 0);
Assert.assertFalse(persistentStateExists());
}