[ASTERIXDB-2250] Clean up files after failed flush/merge

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently we didn't clean up component files if the flush/merge
operation fails. As a result, when a failure happens, the subsequent
retries must fail as well because the files already exist.
- This patch cleans up component files when there is exception thrown
during flush/merge operation
- Added a test case on failed merge

Change-Id: I94630613cfe68de9d5784e022ca3834de959aa02
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2300
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index e3424e5..7acc59f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 public class LSMBTreeFlushOperation extends FlushOperation {
     private final FileReference bloomFilterFlushTarget;
@@ -35,4 +36,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterFlushTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, null, bloomFilterFlushTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index edfa7e1..3aef4c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 import org.apache.hyracks.storage.common.IIndexCursor;
 
@@ -38,4 +39,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterMergeTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, null, bloomFilterMergeTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
index 14cf778..dd4fcf5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 import org.apache.hyracks.storage.common.IIndexCursor;
 
@@ -51,4 +52,8 @@
         return keepDeletedTuples;
     }
 
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, buddyBtreeMergeTarget, bloomFilterMergeTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index ff32628..f5ee23b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 public interface ILSMIOOperation extends Callable<Boolean> {
 
@@ -67,4 +68,9 @@
      * @return the accessor of the operation
      */
     ILSMIndexAccessor getAccessor();
+
+    /**
+     * @return the component files produced by this operation
+     */
+    LSMComponentFileReferences getComponentFiles();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 749b3ba..6c1ef55 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -714,21 +714,77 @@
         ILSMIndexOperationContext opCtx = accessor.getOpContext();
         if (opCtx.getOperation() == IndexOperation.DELETE_MEMORY_COMPONENT) {
             return EmptyComponent.INSTANCE;
-        } else {
-            if (LOGGER.isInfoEnabled()) {
-                FlushOperation flushOp = (FlushOperation) operation;
-                LOGGER.log(Level.INFO, "Flushing component with id: " + flushOp.getFlushingComponent().getId());
-            }
-            return doFlush(operation);
         }
+        if (LOGGER.isInfoEnabled()) {
+            FlushOperation flushOp = (FlushOperation) operation;
+            LOGGER.log(Level.INFO, "Flushing component with id: " + flushOp.getFlushingComponent().getId());
+        }
+        ILSMDiskComponent component = null;
+        try {
+            component = doFlush(operation);
+            return component;
+        } catch (Exception e) {
+            LOGGER.error("Fail to execute flush " + this, e);
+            // clean up component
+            try {
+                cleanUpFiles(operation);
+            } catch (HyracksDataException e1) {
+                e.addSuppressed(e1);
+            }
+            throw HyracksDataException.create(e);
+        }
+
     }
 
     @Override
     public final ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
         ILSMIndexAccessor accessor = operation.getAccessor();
         ILSMIndexOperationContext opCtx = accessor.getOpContext();
-        return opCtx.getOperation() == IndexOperation.DELETE_DISK_COMPONENTS ? EmptyComponent.INSTANCE
-                : doMerge(operation);
+        ILSMDiskComponent component = null;
+        try {
+            component = opCtx.getOperation() == IndexOperation.DELETE_DISK_COMPONENTS ? EmptyComponent.INSTANCE
+                    : doMerge(operation);
+            return component;
+        } catch (Exception e) {
+            LOGGER.error("Fail to execute merge " + this, e);
+            // clean up component
+            try {
+                cleanUpFiles(operation);
+            } catch (HyracksDataException e1) {
+                e.addSuppressed(e1);
+            }
+            throw HyracksDataException.create(e);
+        }
+
+    }
+
+    protected void cleanUpFiles(ILSMIOOperation operation) throws HyracksDataException {
+        LSMComponentFileReferences componentFiles = operation.getComponentFiles();
+        if (componentFiles == null) {
+            return;
+        }
+        FileReference[] files = componentFiles.getFileReferences();
+        HyracksDataException exception = null;
+        for (FileReference file : files) {
+            try {
+                cleanUpFile(file);
+            } catch (HyracksDataException e) {
+                if (exception == null) {
+                    exception = e;
+                } else {
+                    exception.addSuppressed(e);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    protected void cleanUpFile(FileReference file) throws HyracksDataException {
+        if (file != null) {
+            diskBufferCache.deleteFile(file);
+        }
     }
 
     protected abstract LSMComponentFileReferences getMergeFileReferences(ILSMDiskComponent firstComponent,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index e809925..d835021 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 
-public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> {
+public abstract class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> {
 
     public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
             String indexIdentifier) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index 8e98087..4dd57eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -49,4 +49,8 @@
     public FileReference getBloomFilterFileReference() {
         return bloomFilterFileReference;
     }
+
+    public FileReference[] getFileReferences() {
+        return new FileReference[] { insertIndexFileReference, deleteIndexFileReference, bloomFilterFileReference };
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
index e16da5b..ec2305d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 
-public class MergeOperation extends AbstractIoOperation {
+public abstract class MergeOperation extends AbstractIoOperation {
     protected final IIndexCursor cursor;
 
     public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 5fa6b4f..572e05c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -104,6 +104,11 @@
     public ILSMIndexAccessor getAccessor() {
         return ioOp.getAccessor();
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return ioOp.getComponentFiles();
+    }
 }
 
 class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> {
@@ -132,4 +137,5 @@
                 + other.getClass().getSimpleName() + " in " + getClass().getSimpleName());
         return Integer.signum(hashCode() - other.hashCode());
     }
+
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 2106f6a..5b272ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 public class LSMInvertedIndexFlushOperation extends FlushOperation {
     private final FileReference deletedKeysBTreeFlushTarget;
@@ -43,4 +44,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterFlushTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, deletedKeysBTreeFlushTarget, bloomFilterFlushTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 2c1db0f..408d9bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 import org.apache.hyracks.storage.common.IIndexCursor;
 
@@ -44,4 +45,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterMergeTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, deletedKeysBTreeMergeTarget, bloomFilterMergeTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 6991c56..2306ebd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 public class LSMRTreeFlushOperation extends FlushOperation {
 
@@ -42,4 +43,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterFlushTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(target, btreeFlushTarget, bloomFilterFlushTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 572ff01..aed5981 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 import org.apache.hyracks.storage.common.IIndexCursor;
 
@@ -43,4 +44,9 @@
     public FileReference getBloomFilterTarget() {
         return bloomFilterMergeTarget;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFiles() {
+        return new LSMComponentFileReferences(btreeMergeTarget, null, bloomFilterMergeTarget);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
new file mode 100644
index 0000000..4c325c0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.OrderedIndexTestUtils;
+import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeRangeSearchCursor;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("rawtypes")
+public class LSMBTreeMergeFailTest {
+
+    private final OrderedIndexTestUtils orderedIndexTestUtils = new OrderedIndexTestUtils();
+
+    private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
+
+    private final TestIoScheduler scheduler = new TestIoScheduler();
+
+    @Before
+    public void setUp() throws HyracksDataException {
+        harness.setUp();
+    }
+
+    @After
+    public void tearDown() throws HyracksDataException {
+        harness.tearDown();
+    }
+
+    @Test
+    public void testMergeFail() throws Exception {
+        ISerializerDeserializer[] fieldSerdes =
+                { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        LSMBTreeTestContext ctx = createTestContext(fieldSerdes, 1, BTreeLeafFrameType.REGULAR_NSM, true);
+        LSMBTree btree = (LSMBTree) ctx.getIndex();
+        btree.create();
+        btree.activate();
+        ILSMIndexAccessor accessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+        ITupleReference tuple1 = TupleUtils.createIntegerTuple(1, 1, 1);
+        accessor.insert(tuple1);
+        // flush component
+        accessor.scheduleFlush(btree.getIOOperationCallback());
+
+        ITupleReference tuple2 = TupleUtils.createIntegerTuple(2, 2, 2);
+        accessor.insert(tuple2);
+        // flush component
+        accessor.scheduleFlush(btree.getIOOperationCallback());
+
+        ITupleReference tuple3 = TupleUtils.createIntegerTuple(3, 3, 3);
+        accessor.insert(tuple3);
+        // flush component
+        accessor.scheduleFlush(btree.getIOOperationCallback());
+
+        scheduler.modify = true;
+
+        boolean exceptionThrown = false;
+        try {
+            accessor.scheduleMerge(btree.getIOOperationCallback(), btree.getDiskComponents());
+        } catch (HyracksDataException e) {
+            exceptionThrown = true;
+        }
+        Assert.assertTrue(exceptionThrown);
+
+        scheduler.modify = false;
+        accessor.scheduleMerge(btree.getIOOperationCallback(), btree.getDiskComponents());
+        Assert.assertEquals(1, btree.getDiskComponents().size());
+
+        btree.deactivate();
+        btree.destroy();
+    }
+
+    protected LSMBTreeTestContext createTestContext(ISerializerDeserializer[] fieldSerdes, int numKeys,
+            BTreeLeafFrameType leafType, boolean filtered) throws Exception {
+        return LSMBTreeTestContext.create(harness.getIOManager(), harness.getVirtualBufferCaches(),
+                harness.getFileReference(), harness.getDiskBufferCache(), fieldSerdes, numKeys,
+                harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), harness.getOperationTracker(),
+                scheduler, harness.getIOOperationCallbackFactory(), harness.getMetadataPageManagerFactory(), filtered,
+                true, false);
+    }
+
+    private class TestIoScheduler implements ILSMIOOperationScheduler {
+        boolean modify = false;
+
+        @Override
+        public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
+            if (modify) {
+                try {
+                    modifyOperation(operation);
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+            operation.call();
+        }
+
+        private void modifyOperation(ILSMIOOperation operation) throws Exception {
+            if (!(operation instanceof MergeOperation)) {
+                return;
+            }
+            Field field = MergeOperation.class.getDeclaredField("cursor");
+            field.setAccessible(true);
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersField.setAccessible(true);
+            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+            LSMBTreeRangeSearchCursor originalCursor = (LSMBTreeRangeSearchCursor) field.get(operation);
+            field.set(operation, new TestCursor(originalCursor.getOpCtx()));
+        }
+    }
+
+    private class TestCursor extends LSMBTreeRangeSearchCursor {
+        public TestCursor(ILSMIndexOperationContext opCtx) {
+            super(opCtx);
+        }
+
+        @Override
+        public boolean hasNext() throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}