Merged fullstack_lsm_staging upto r3336

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml
new file mode 100644
index 0000000..9656141
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-storage-am-lsm-common-test</artifactId>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-tests</artifactId>
+    <version>0.2.4-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-lsm-common</artifactId>
+  		<version>0.2.4-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-test-support</artifactId>
+  		<version>0.2.4-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java
new file mode 100644
index 0000000..69e23bc
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyLSMIndexFileManager.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class DummyLSMIndexFileManager extends AbstractLSMIndexFileManager {
+
+    public DummyLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory) {
+        super(ioManager, fileMapProvider, file, treeFactory, 0);
+    }
+
+    protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+            throws HyracksDataException, IndexException {
+        File dir = new File(dev.getPath(), baseDir);
+        String[] files = dir.list(filter);
+        for (String fileName : files) {
+            File file = new File(dir.getPath() + File.separator + fileName);
+            FileReference fileRef = new FileReference(file);
+            allFiles.add(new ComparableFileName(fileRef));
+        }
+    }
+}
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyTreeFactory.java b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyTreeFactory.java
new file mode 100644
index 0000000..8b22771
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/DummyTreeFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+
+public class DummyTreeFactory extends TreeIndexFactory<ITreeIndex> {
+
+    public DummyTreeFactory() {
+        super(null, null, null, null, null, null, 0);
+    }
+
+    @Override
+    public ITreeIndex createIndexInstance(FileReference file) throws IndexException {
+        return null;
+    }
+
+}
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryBufferCacheTest.java b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryBufferCacheTest.java
new file mode 100644
index 0000000..adba93d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryBufferCacheTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+
+public class InMemoryBufferCacheTest {
+    private static final int PAGE_SIZE = 256;
+    private static final int NUM_PAGES = 100;
+    private HashSet<ICachedPage> pinnedPages = new HashSet<ICachedPage>();
+
+    @Test
+    public void test01() throws Exception {
+        InMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), PAGE_SIZE, NUM_PAGES,
+                new TransientFileMapManager());
+        memBufferCache.open();
+        int dummyFileId = 0;
+        // Pin all pages, and make sure they return unique ICachedPages.
+        // We expect no overflow pages.
+        for (int i = 0; i < NUM_PAGES; i++) {
+            ICachedPage page = memBufferCache.pin(BufferedFileHandle.getDiskPageId(dummyFileId, i), false);
+            if (pinnedPages.contains(page)) {
+                fail("Id collision for ICachedPage, caused by id: " + i);
+            }
+            pinnedPages.add(page);
+            assertEquals(0, memBufferCache.getNumOverflowPages());
+        }
+        // Pin pages above capacity. We expect to be given new overflow pages.
+        // Going above capacity should be very rare, but nevertheless succeed.
+        for (int i = 0; i < 100; i++) {
+            ICachedPage page = memBufferCache.pin(BufferedFileHandle.getDiskPageId(dummyFileId, i + NUM_PAGES), false);
+            if (pinnedPages.contains(page)) {
+                fail("Id collision for ICachedPage, caused by overflow id: " + i);
+            }
+            pinnedPages.add(page);
+            assertEquals(i + 1, memBufferCache.getNumOverflowPages());
+        }
+        memBufferCache.close();
+    }
+}
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryFreePageManagerTest.java b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryFreePageManagerTest.java
new file mode 100644
index 0000000..bd09a3f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/InMemoryFreePageManagerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+
+public class InMemoryFreePageManagerTest {
+
+    private final int NUM_PAGES = 100;
+    
+    private void testInMemoryFreePageManager(InMemoryFreePageManager memFreePageManager) throws HyracksDataException {
+        // The first two pages are reserved for the BTree's metadata page and
+        // root page.
+        // The "actual" capacity is therefore numPages - 2.
+        int capacity = memFreePageManager.getCapacity();
+        assertEquals(capacity, NUM_PAGES - 2);
+        for (int i = 0; i < capacity; i++) {
+            int pageId = memFreePageManager.getFreePage(null);
+            // The free pages start from page 2;
+            assertEquals(i + 2, pageId);
+            assertFalse(memFreePageManager.isFull());
+        }
+        // Start asking for 100 pages above the capacity.
+        // Asking for pages above the capacity should be very rare, but
+        // nevertheless succeed.
+        // We expect isFull() to return true.
+        for (int i = 0; i < 100; i++) {
+            int pageId = memFreePageManager.getFreePage(null);
+            assertEquals(capacity + i + 2, pageId);
+            assertTrue(memFreePageManager.isFull());
+        }
+    }
+    
+    @Test
+    public void test01() throws HyracksDataException {
+        ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
+        InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(NUM_PAGES, metaFrameFactory);
+        testInMemoryFreePageManager(memFreePageManager);
+        // We expect exactly the same behavior after a reset().
+        memFreePageManager.reset();
+        testInMemoryFreePageManager(memFreePageManager);
+    }
+}
diff --git a/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
new file mode 100644
index 0000000..161f4ce
--- /dev/null
+++ b/fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+
+public class LSMIndexFileManagerTest {
+    private static final int DEFAULT_PAGE_SIZE = 256;
+    private static final int DEFAULT_NUM_PAGES = 100;
+    private static final int DEFAULT_MAX_OPEN_FILES = 10;
+    protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    protected final static String sep = System.getProperty("file.separator");
+    protected IOManager ioManager;
+    protected IFileMapProvider fileMapProvider;
+    protected String baseDir;
+    protected FileReference file;
+
+    @Before
+    public void setUp() throws HyracksException {
+        TestStorageManagerComponentHolder.init(DEFAULT_PAGE_SIZE, DEFAULT_NUM_PAGES, DEFAULT_MAX_OPEN_FILES);
+        ioManager = TestStorageManagerComponentHolder.getIOManager();
+        fileMapProvider = TestStorageManagerComponentHolder.getFileMapProvider(null);
+        baseDir = "lsm_tree" + simpleDateFormat.format(new Date()) + sep;
+        File f = new File(baseDir);
+        f.mkdirs();
+        file = new FileReference(f);
+    }
+
+    @After
+    public void tearDown() throws HyracksDataException {
+        File f = new File(baseDir);
+        f.deleteOnExit();
+    }
+
+    public void sortOrderTest(boolean testFlushFileName) throws InterruptedException, HyracksDataException {
+        ILSMIndexFileManager fileManager = new DummyLSMIndexFileManager(ioManager, fileMapProvider, file,
+                new DummyTreeFactory());
+        LinkedList<String> fileNames = new LinkedList<String>();
+
+        int numFileNames = 100;
+        long sleepTime = 5;
+        for (int i = 0; i < numFileNames; i++) {
+            String flushFileName = (String) fileManager.getRelFlushFileReference().getInsertIndexFileReference()
+                    .getFile().getName();
+            if (testFlushFileName) {
+                fileNames.addFirst(flushFileName);
+            }
+            Thread.sleep(sleepTime);
+            if (!testFlushFileName) {
+                String secondFlushFileName = (String) fileManager.getRelFlushFileReference()
+                        .getInsertIndexFileReference().getFile().getName();
+                String mergeFileName = getMergeFileName(fileManager, flushFileName, secondFlushFileName);
+                fileNames.addFirst(mergeFileName);
+                Thread.sleep(sleepTime);
+            }
+        }
+
+        List<String> sortedFileNames = new ArrayList<String>();
+        sortedFileNames.addAll(fileNames);
+
+        // Make sure the comparator sorts in the correct order (i.e., the
+        // reverse insertion order in this case).
+        Comparator<String> cmp = fileManager.getFileNameComparator();
+        Collections.sort(sortedFileNames, cmp);
+        for (int i = 0; i < numFileNames; i++) {
+            assertEquals(fileNames.get(i), sortedFileNames.get(i));
+        }
+    }
+
+    @Test
+    public void flushAndMergeFilesSortOrderTest() throws InterruptedException, HyracksDataException {
+        sortOrderTest(true);
+        sortOrderTest(false);
+    }
+
+    public void cleanInvalidFilesTest(IOManager ioManager) throws InterruptedException, IOException, IndexException {
+        ILSMIndexFileManager fileManager = new DummyLSMIndexFileManager(ioManager, fileMapProvider, file,
+                new DummyTreeFactory());
+        fileManager.createDirs();
+
+        List<FileReference> flushFiles = new ArrayList<FileReference>();
+        List<FileReference> allFiles = new ArrayList<FileReference>();
+
+        int numFileNames = 100;
+        long sleepTime = 5;
+        // Generate a bunch of flush files.
+        for (int i = 0; i < numFileNames; i++) {
+            LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference();
+            flushFiles.add(relFlushFileRefs.getInsertIndexFileReference());
+            Thread.sleep(sleepTime);
+        }
+        allFiles.addAll(flushFiles);
+
+        // Simulate merging some of the flush files.
+        // Merge range 0 to 4.
+        FileReference mergeFile1 = simulateMerge(fileManager, flushFiles.get(0), flushFiles.get(4));
+        allFiles.add(mergeFile1);
+        // Merge range 5 to 9.
+        FileReference mergeFile2 = simulateMerge(fileManager, flushFiles.get(5), flushFiles.get(9));
+        allFiles.add(mergeFile2);
+        // Merge range 10 to 19.
+        FileReference mergeFile3 = simulateMerge(fileManager, flushFiles.get(10), flushFiles.get(19));
+        allFiles.add(mergeFile3);
+        // Merge range 20 to 29.
+        FileReference mergeFile4 = simulateMerge(fileManager, flushFiles.get(20), flushFiles.get(29));
+        allFiles.add(mergeFile4);
+        // Merge range 50 to 79.
+        FileReference mergeFile5 = simulateMerge(fileManager, flushFiles.get(50), flushFiles.get(79));
+        allFiles.add(mergeFile5);
+
+        // Simulate merging of merge files.
+        FileReference mergeFile6 = simulateMerge(fileManager, mergeFile1, mergeFile2);
+        allFiles.add(mergeFile6);
+        FileReference mergeFile7 = simulateMerge(fileManager, mergeFile3, mergeFile4);
+        allFiles.add(mergeFile7);
+
+        // Create all files and set delete on exit for all files.
+        for (FileReference fileRef : allFiles) {
+            fileRef.getFile().createNewFile();
+            fileRef.getFile().deleteOnExit();
+        }
+
+        // Populate expected valid flush files.
+        List<String> expectedValidFiles = new ArrayList<String>();
+        for (int i = 30; i < 50; i++) {
+            expectedValidFiles.add(flushFiles.get(i).getFile().getName());
+        }
+        for (int i = 80; i < 100; i++) {
+            expectedValidFiles.add(flushFiles.get(i).getFile().getName());
+        }
+
+        // Populate expected valid merge files.
+        expectedValidFiles.add(mergeFile5.getFile().getName());
+        expectedValidFiles.add(mergeFile6.getFile().getName());
+        expectedValidFiles.add(mergeFile7.getFile().getName());
+
+        // Sort expected files.
+        Collections.sort(expectedValidFiles, fileManager.getFileNameComparator());
+
+        // Pass null and a dummy component finalizer. We don't test for physical consistency in this test.
+        List<LSMComponentFileReferences> lsmComonentFileReference = fileManager.cleanupAndGetValidFiles();
+
+        // Check actual files against expected files.
+        assertEquals(expectedValidFiles.size(), lsmComonentFileReference.size());
+        for (int i = 0; i < expectedValidFiles.size(); i++) {
+            assertEquals(expectedValidFiles.get(i), lsmComonentFileReference.get(i).getInsertIndexFileReference()
+                    .getFile().getName());
+        }
+
+        // Make sure invalid files were removed from all IODevices.
+        ArrayList<String> remainingFiles = new ArrayList<String>();
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
+            File dir = new File(dev.getPath(), baseDir);
+            FilenameFilter filter = new FilenameFilter() {
+                public boolean accept(File dir, String name) {
+                    return !name.startsWith(".");
+                }
+            };
+            String[] files = dir.list(filter);
+            for (String file : files) {
+                File f = new File(file);
+                remainingFiles.add(f.getName());
+            }
+        }
+
+        Collections.sort(remainingFiles, fileManager.getFileNameComparator());
+        // Check actual files in directory against expected files.
+        assertEquals(expectedValidFiles.size(), remainingFiles.size());
+        for (int i = 0; i < expectedValidFiles.size(); i++) {
+            assertEquals(expectedValidFiles.get(i), remainingFiles.get(i));
+        }
+    }
+
+    @Test
+    public void singleIODeviceTest() throws InterruptedException, IOException, IndexException {
+        IOManager singleDeviceIOManager = createIOManager(1);
+        cleanInvalidFilesTest(singleDeviceIOManager);
+        cleanDirs(singleDeviceIOManager);
+    }
+
+    @Test
+    public void twoIODevicesTest() throws InterruptedException, IOException, IndexException {
+        IOManager twoDevicesIOManager = createIOManager(2);
+        cleanInvalidFilesTest(twoDevicesIOManager);
+        cleanDirs(twoDevicesIOManager);
+    }
+
+    @Test
+    public void fourIODevicesTest() throws InterruptedException, IOException, IndexException {
+        IOManager fourDevicesIOManager = createIOManager(4);
+        cleanInvalidFilesTest(fourDevicesIOManager);
+        cleanDirs(fourDevicesIOManager);
+    }
+
+    private void cleanDirs(IOManager ioManager) {
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
+            File dir = new File(dev.getPath(), baseDir);
+            FilenameFilter filter = new FilenameFilter() {
+                public boolean accept(File dir, String name) {
+                    return !name.startsWith(".");
+                }
+            };
+            String[] files = dir.list(filter);
+            for (String file : files) {
+                File f = new File(file);
+                f.delete();
+            }
+        }
+    }
+
+    private IOManager createIOManager(int numDevices) throws HyracksException {
+        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+        for (int i = 0; i < numDevices; i++) {
+            String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i;
+            devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
+        }
+        return new IOManager(devices, Executors.newCachedThreadPool());
+    }
+
+    private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)
+            throws HyracksDataException {
+        LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(a.getFile().getName(), b
+                .getFile().getName());
+        return relMergeFileRefs.getInsertIndexFileReference();
+    }
+
+    private String getMergeFileName(ILSMIndexFileManager fileNameManager, String firstFile, String lastFile)
+            throws HyracksDataException {
+        File f1 = new File(firstFile);
+        File f2 = new File(lastFile);
+        return (String) fileNameManager.getRelMergeFileReference(f1.getName(), f2.getName())
+                .getInsertIndexFileReference().getFile().getName();
+    }
+}