Merge branch 'gerrit/mad-hatter'

Change-Id: I1ed2ad00b1fb6ef5fe70e7a1d6a753d8da59e269
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 17a4f46..9802001 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -20,7 +20,6 @@
 
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
@@ -42,8 +41,6 @@
     @Override
     public void operationFailed(ILSMIOOperation operation, Throwable t) {
         LOGGER.error("Operation {} has failed", operation, t);
-        if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
-            ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
-        }
+        ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 00d2d3d..50118fc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -171,6 +171,59 @@
         Assert.assertFalse(indexMetadataMaskFile.exists());
     }
 
+    @Test
+    public void deleteMaskedMergedFiles() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+        FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+        int compSeqStart = 100;
+        int validComponentSequence = 103;
+        // advance valid component seq in checkpoint
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+        LocalResource localResource = localResourceRepository.get(indexPath);
+        DatasetResourceReference drr = DatasetResourceReference.of(localResource);
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
+        IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
+        indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
+        // create components to be merged
+        String btree = "_b";
+        String filter = "_f";
+        String indexDir = indexDirRef.getFile().getAbsolutePath();
+        for (int i = compSeqStart; i <= validComponentSequence; i++) {
+            String componentId = i + "_" + i;
+            Path btreePath = Paths.get(indexDir, componentId + btree);
+            Path filterPath = Paths.get(indexDir, componentId + filter);
+            Files.createFile(btreePath);
+            Files.createFile(filterPath);
+        }
+        // create masked merged component
+        String mergedComponentId = compSeqStart + "_" + validComponentSequence;
+        Path mergedBtreePath = Paths.get(indexDir, mergedComponentId + btree);
+        Path mergedFilterPath = Paths.get(indexDir, mergedComponentId + filter);
+        Path mergeMaskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + mergedComponentId);
+        Files.createFile(mergedBtreePath);
+        Files.createFile(mergedFilterPath);
+        Files.createFile(mergeMaskPath);
+        // cleanup storage and ensure merged component files were deleted while individual files still exist
+        DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+        localResourceRepository.cleanup(lr.getPartition());
+        Assert.assertFalse(mergedBtreePath.toFile().exists());
+        Assert.assertFalse(mergedFilterPath.toFile().exists());
+        Assert.assertFalse(mergeMaskPath.toFile().exists());
+        for (int i = compSeqStart; i <= validComponentSequence; i++) {
+            String componentId = i + "_" + i;
+            Path btreePath = Paths.get(indexDir, componentId + btree);
+            Path filterPath = Paths.get(indexDir, componentId + filter);
+            Assert.assertTrue(btreePath.toFile().exists());
+            Assert.assertTrue(filterPath.toFile().exists());
+        }
+    }
+
     private void ensureInvalidComponentDeleted(String indexDir, String componentSeq,
             PersistentLocalResourceRepository localResourceRepository, DatasetLocalResource lr) throws IOException {
         Path btreePath = Paths.get(indexDir,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index e3bd13d..45594eb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -463,7 +463,7 @@
                         scheduleFlush();
                     } catch (Throwable e) {
                         LOGGER.error("Unexpected exception when trying to schedule flushes.", e);
-                        ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
+                        ExitUtil.halt(ExitUtil.EC_IO_SCHEDULER_FAILED);
                     }
                 }
             }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 0aa46a8..c3737da 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -19,6 +19,10 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
@@ -28,8 +32,10 @@
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -83,7 +89,15 @@
 
     @Override
     public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
-        // No Op
+        if (isMerge(operation)) {
+            FileReference operationMaskFilePath = getOperationMaskFilePath(operation);
+            // if a merge operation is attempted after a failure, its mask file may already exists
+            if (!operationMaskFilePath.getFile().exists()) {
+                IoUtil.create(operationMaskFilePath);
+            } else {
+                LOGGER.warn("merge operation mask file {} already exists", operationMaskFilePath);
+            }
+        }
     }
 
     @Override
@@ -121,6 +135,8 @@
         } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
                 || operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
             addComponentToCheckpoint(operation);
+        } else if (isMerge(operation)) {
+            IoUtil.delete(getOperationMaskFilePath(operation));
         }
     }
 
@@ -277,4 +293,18 @@
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
         // no op
     }
+
+    private boolean isMerge(ILSMIOOperation operation) {
+        return operation.getIOOpertionType() == LSMIOOperationType.MERGE
+                && operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
+    }
+
+    private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
+        FileReference target = operation.getTarget();
+        final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
+        Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
+        Path maskFileRelPath =
+                Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+        return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 9d934e4..c3b6229 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 
 public class ResourceReference {
 
@@ -126,4 +127,17 @@
     public String toString() {
         return getRelativePath().toString();
     }
+
+    /**
+     * Gets a component sequence based on its unique timestamp.
+     * e.g. a component file 1_3_b
+     * will return a component sequence 1_3
+     *
+     * @param componentFile any component file
+     * @return The component sequence
+     */
+    public static String getComponentSequence(String componentFile) {
+        final ResourceReference ref = of(componentFile);
+        return IndexComponentFileReference.of(ref.getName()).getSequence();
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 181a75b..3f04bd2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -28,10 +28,10 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -63,7 +63,7 @@
         final IIOManager ioManager = appCtx.getIoManager();
         final FileReference localPath = ioManager.resolve(componentFile);
         final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
-        final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+        final String componentSequence = ResourceReference.getComponentSequence(componentFile);
         return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 31f5171..145be86 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
@@ -593,19 +594,6 @@
         return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
     }
 
-    /**
-     * Gets a component sequence based on its unique timestamp.
-     * e.g. a component file 1_3_b
-     * will return a component sequence 1_3
-     *
-     * @param componentFile any component file
-     * @return The component sequence
-     */
-    public static String getComponentSequence(String componentFile) {
-        final ResourceReference ref = ResourceReference.of(componentFile);
-        return IndexComponentFileReference.of(ref.getName()).getSequence();
-    }
-
     private static boolean isComponentMask(File mask) {
         return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f8bc9f9..4991f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -58,7 +58,7 @@
     public static final int EC_IO_SCHEDULER_FAILED = 55;
     public static final int EC_HALT_SHUTDOWN_TIMED_OUT = 66;
     public static final int EC_HALT_WATCHDOG_FAILED = 77;
-    public static final int EC_FLUSH_FAILED = 88;
+    public static final int EC_IO_OPERATION_FAILED = 88;
     public static final int EC_TERMINATE_NC_SERVICE_DIRECTIVE = 99;
     private static final ExitThread exitThread = new ExitThread();
     private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index 183cb6f..bc33e6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -21,18 +21,26 @@
 import java.util.concurrent.TimeUnit;
 
 public class Span {
-    private final long startNanos;
     private final long spanNanos;
+    private volatile long startNanos;
 
     private Span(long span, TimeUnit unit) {
-        startNanos = System.nanoTime();
         spanNanos = unit.toNanos(span);
+        reset();
+    }
+
+    public void reset() {
+        startNanos = System.nanoTime();
     }
 
     public long getSpanNanos() {
         return spanNanos;
     }
 
+    public long getSpan(TimeUnit unit) {
+        return unit.convert(spanNanos, TimeUnit.NANOSECONDS);
+    }
+
     public static Span start(long span, TimeUnit unit) {
         return new Span(span, unit);
     }
@@ -46,6 +54,15 @@
     }
 
     /**
+     * Sleep for the remainder of this span
+     *
+     * @throws InterruptedException
+     */
+    public void sleep() throws InterruptedException {
+        TimeUnit.NANOSECONDS.sleep(remaining(TimeUnit.NANOSECONDS));
+    }
+
+    /**
      * Sleep for the minimum of the duration or the remaining of this span
      *
      * @param sleep