[ASTERIXDB-2738][STO] Create Mask File Before Merge Operations

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Before starting a merge operation, create a mask file
  (.mask_C_startSeq_endSeq) for the merged component to
  indicate that this component isn't valid yet.
- On the merge operation successful completion, delete the
  merged component mask file.
- In the case of any unexpected failure during the merge
  operation, all files of the failed merged component will
  be deleted on node startup/shutdown, including the mask
  file.
- Halt on any IO opeartion failure.
- Add a test case that ensures only masked merged components
  are deleted but not the original components that were
  supposed to be merged.

Change-Id: I476dd3be5e75468e83044b3aaf0f6c2d8beadf1c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6425
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Luo Chen <cluo8@uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 17a4f46..9802001 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -20,7 +20,6 @@
 
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
@@ -42,8 +41,6 @@
     @Override
     public void operationFailed(ILSMIOOperation operation, Throwable t) {
         LOGGER.error("Operation {} has failed", operation, t);
-        if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
-            ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
-        }
+        ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 00d2d3d..50118fc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -171,6 +171,59 @@
         Assert.assertFalse(indexMetadataMaskFile.exists());
     }
 
+    @Test
+    public void deleteMaskedMergedFiles() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+        FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+        int compSeqStart = 100;
+        int validComponentSequence = 103;
+        // advance valid component seq in checkpoint
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+        LocalResource localResource = localResourceRepository.get(indexPath);
+        DatasetResourceReference drr = DatasetResourceReference.of(localResource);
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
+        IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
+        indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
+        // create components to be merged
+        String btree = "_b";
+        String filter = "_f";
+        String indexDir = indexDirRef.getFile().getAbsolutePath();
+        for (int i = compSeqStart; i <= validComponentSequence; i++) {
+            String componentId = i + "_" + i;
+            Path btreePath = Paths.get(indexDir, componentId + btree);
+            Path filterPath = Paths.get(indexDir, componentId + filter);
+            Files.createFile(btreePath);
+            Files.createFile(filterPath);
+        }
+        // create masked merged component
+        String mergedComponentId = compSeqStart + "_" + validComponentSequence;
+        Path mergedBtreePath = Paths.get(indexDir, mergedComponentId + btree);
+        Path mergedFilterPath = Paths.get(indexDir, mergedComponentId + filter);
+        Path mergeMaskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + mergedComponentId);
+        Files.createFile(mergedBtreePath);
+        Files.createFile(mergedFilterPath);
+        Files.createFile(mergeMaskPath);
+        // cleanup storage and ensure merged component files were deleted while individual files still exist
+        DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+        localResourceRepository.cleanup(lr.getPartition());
+        Assert.assertFalse(mergedBtreePath.toFile().exists());
+        Assert.assertFalse(mergedFilterPath.toFile().exists());
+        Assert.assertFalse(mergeMaskPath.toFile().exists());
+        for (int i = compSeqStart; i <= validComponentSequence; i++) {
+            String componentId = i + "_" + i;
+            Path btreePath = Paths.get(indexDir, componentId + btree);
+            Path filterPath = Paths.get(indexDir, componentId + filter);
+            Assert.assertTrue(btreePath.toFile().exists());
+            Assert.assertTrue(filterPath.toFile().exists());
+        }
+    }
+
     private void ensureInvalidComponentDeleted(String indexDir, String componentSeq,
             PersistentLocalResourceRepository localResourceRepository, DatasetLocalResource lr) throws IOException {
         Path btreePath = Paths.get(indexDir,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 0aa46a8..c3737da 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -19,6 +19,10 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
@@ -28,8 +32,10 @@
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -83,7 +89,15 @@
 
     @Override
     public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
-        // No Op
+        if (isMerge(operation)) {
+            FileReference operationMaskFilePath = getOperationMaskFilePath(operation);
+            // if a merge operation is attempted after a failure, its mask file may already exists
+            if (!operationMaskFilePath.getFile().exists()) {
+                IoUtil.create(operationMaskFilePath);
+            } else {
+                LOGGER.warn("merge operation mask file {} already exists", operationMaskFilePath);
+            }
+        }
     }
 
     @Override
@@ -121,6 +135,8 @@
         } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
                 || operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
             addComponentToCheckpoint(operation);
+        } else if (isMerge(operation)) {
+            IoUtil.delete(getOperationMaskFilePath(operation));
         }
     }
 
@@ -277,4 +293,18 @@
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
         // no op
     }
+
+    private boolean isMerge(ILSMIOOperation operation) {
+        return operation.getIOOpertionType() == LSMIOOperationType.MERGE
+                && operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
+    }
+
+    private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
+        FileReference target = operation.getTarget();
+        final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
+        Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
+        Path maskFileRelPath =
+                Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+        return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index ae949fe..a0de153 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 
 public class ResourceReference {
 
@@ -130,4 +131,17 @@
     public String toString() {
         return getRelativePath().toString();
     }
+
+    /**
+     * Gets a component sequence based on its unique timestamp.
+     * e.g. a component file 1_3_b
+     * will return a component sequence 1_3
+     *
+     * @param componentFile any component file
+     * @return The component sequence
+     */
+    public static String getComponentSequence(String componentFile) {
+        final ResourceReference ref = of(componentFile);
+        return IndexComponentFileReference.of(ref.getName()).getSequence();
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 181a75b..3f04bd2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -28,10 +28,10 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -63,7 +63,7 @@
         final IIOManager ioManager = appCtx.getIoManager();
         final FileReference localPath = ioManager.resolve(componentFile);
         final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
-        final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+        final String componentSequence = ResourceReference.getComponentSequence(componentFile);
         return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 31f5171..145be86 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
@@ -593,19 +594,6 @@
         return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
     }
 
-    /**
-     * Gets a component sequence based on its unique timestamp.
-     * e.g. a component file 1_3_b
-     * will return a component sequence 1_3
-     *
-     * @param componentFile any component file
-     * @return The component sequence
-     */
-    public static String getComponentSequence(String componentFile) {
-        final ResourceReference ref = ResourceReference.of(componentFile);
-        return IndexComponentFileReference.of(ref.getName()).getSequence();
-    }
-
     private static boolean isComponentMask(File mask) {
         return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f8bc9f9..4991f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -58,7 +58,7 @@
     public static final int EC_IO_SCHEDULER_FAILED = 55;
     public static final int EC_HALT_SHUTDOWN_TIMED_OUT = 66;
     public static final int EC_HALT_WATCHDOG_FAILED = 77;
-    public static final int EC_FLUSH_FAILED = 88;
+    public static final int EC_IO_OPERATION_FAILED = 88;
     public static final int EC_TERMINATE_NC_SERVICE_DIRECTIVE = 99;
     private static final ExitThread exitThread = new ExitThread();
     private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();