Fix for asterix issue 630.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 6e17ff8..9c2268d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -23,21 +23,20 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 
 /**
- * Quick & dirty data generator for multi-thread testing. 
- *
+ * Quick & dirty data generator for multi-thread testing.
  */
 @SuppressWarnings("rawtypes")
 public class DataGenThread extends Thread {
     public final BlockingQueue<TupleBatch> tupleBatchQueue;
     private final int maxNumBatches;
-    private final int maxOutstandingBatches;        
+    private final int maxOutstandingBatches;
     private int numBatches = 0;
     private final Random rnd;
-    
+
     // maxOutstandingBatches pre-created tuple-batches for populating the queue.
     private TupleBatch[] tupleBatches;
     private int ringPos;
-    
+
     public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
             int payloadSize, int rndSeed, int maxOutstandingBatches, boolean sorted) {
         this.maxNumBatches = maxNumBatches;
@@ -51,7 +50,7 @@
         tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
         ringPos = 0;
     }
-    
+
     public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
             IFieldValueGenerator[] fieldGens, int rndSeed, int maxOutstandingBatches) {
         this.maxNumBatches = maxNumBatches;
@@ -64,13 +63,13 @@
         tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
         ringPos = 0;
     }
-    
+
     @Override
     public void run() {
-        while(numBatches < maxNumBatches) {
+        while (numBatches < maxNumBatches) {
             boolean added = false;
             try {
-                if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {                    
+                if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
                     tupleBatches[ringPos].generate();
                     tupleBatchQueue.put(tupleBatches[ringPos]);
                     added = true;
@@ -89,11 +88,11 @@
             }
         }
     }
-    
+
     public TupleBatch getBatch() throws InterruptedException {
         return tupleBatchQueue.take();
     }
-    
+
     public void releaseBatch(TupleBatch batch) {
         batch.inUse.set(false);
     }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
index d7234fa..cb9c545 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
-@SuppressWarnings("rawtypes") 
+@SuppressWarnings("rawtypes")
 public class DataGenUtils {
     public static IFieldValueGenerator getFieldGenFromSerde(ISerializerDeserializer serde, Random rnd, boolean sorted) {
         if (serde instanceof IntegerSerializerDeserializer) {
@@ -49,8 +49,9 @@
         }
         return null;
     }
-    
-    public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd, boolean sorted) {
+
+    public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd,
+            boolean sorted) {
         IFieldValueGenerator[] fieldValueGens = new IFieldValueGenerator[serdes.length];
         for (int i = 0; i < serdes.length; i++) {
             fieldValueGens[i] = getFieldGenFromSerde(serdes[i], rnd, sorted);
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 3e51e20..28f44e6 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -54,7 +54,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -164,8 +163,7 @@
         }
 
         if (flushOnExit) {
-            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
-                    ioOpCallback);
+            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
             ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             accessor.scheduleFlush(cb);
             try {
@@ -370,7 +368,8 @@
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
-                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
+                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager
+                .getBaseDir()));
     }
 
     @Override
@@ -442,7 +441,8 @@
                 .getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager
+                .getBaseDir()));
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 668e727..94b1569 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -27,21 +27,24 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMBTreeFlushOperation implements ILSMIOOperation {
+public class LSMBTreeFlushOperation implements ILSMIOOperation, Comparable<LSMBTreeFlushOperation> {
 
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
     private final FileReference btreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
-            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
+            String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -83,4 +86,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMBTreeFlushOperation o) {
+        return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 3a608fe..a4f6875 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -37,16 +37,18 @@
     private final FileReference btreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
-            ILSMIOOperationCallback callback) {
+            ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
         this.btreeMergeTarget = btreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -94,4 +96,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 2c3940f..1638a5d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -22,6 +22,12 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMIOOperation extends Callable<Boolean> {
+
+    public enum LSMIOOpertionType {
+        FLUSH,
+        MERGE
+    }
+
     public Set<IODeviceHandle> getReadDevices();
 
     public Set<IODeviceHandle> getWriteDevices();
@@ -29,4 +35,8 @@
     public Boolean call() throws HyracksDataException, IndexException;
 
     public ILSMIOOperationCallback getCallback();
+
+    public String getIndexUniqueIdentifier();
+
+    public LSMIOOpertionType getIOOpertionType();
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 25894f1..b785764 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -14,24 +14,87 @@
  */
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 
 public class AsynchronousScheduler implements ILSMIOOperationScheduler {
+    // Since this is a asynchronous scheduler, we make sure that flush operations coming from the same lsm index
+    // will be executed serially in same order of scheduling the operations. Look at asterix issue 630.
+
     public final static AsynchronousScheduler INSTANCE = new AsynchronousScheduler();
     private ExecutorService executor;
+    private final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<String, ILSMIOOperation>();
+    private final Map<String, PriorityQueue<ILSMIOOperation>> waitingFlushOperations = new HashMap<String, PriorityQueue<ILSMIOOperation>>();
 
     public void init(ThreadFactory threadFactory) {
-        executor = Executors.newCachedThreadPool(threadFactory);
+        // Creating an executor with the same configuration of Executors.newCachedThreadPool. 
+        executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), threadFactory) {
+
+            @Override
+            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                return new LSMIOOperationTask<T>(callable);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            protected void afterExecute(Runnable r, Throwable t) {
+                super.afterExecute(r, t);
+                LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
+                ILSMIOOperation executedOp = task.getOperation();
+                String id = executedOp.getIndexUniqueIdentifier();
+                synchronized (this) {
+                    runningFlushOperations.remove(id);
+                    if (waitingFlushOperations.containsKey(id)) {
+                        try {
+                            ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+                            if (op != null) {
+                                scheduleOperation(op);
+                            } else {
+                                waitingFlushOperations.remove(id);
+                            }
+                        } catch (HyracksDataException e) {
+                            t = e.getCause();
+                        }
+                    }
+                }
+            }
+        };
     }
 
     @Override
     public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
-        executor.submit(operation);
+        if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
+            executor.submit(operation);
+        } else {
+            String id = operation.getIndexUniqueIdentifier();
+            synchronized (executor) {
+                if (runningFlushOperations.containsKey(id)) {
+                    if (waitingFlushOperations.containsKey(id)) {
+                        waitingFlushOperations.get(id).offer(operation);
+                    } else {
+                        PriorityQueue<ILSMIOOperation> q = new PriorityQueue<ILSMIOOperation>();
+                        q.offer(operation);
+                        waitingFlushOperations.put(id, q);
+                    }
+                } else {
+                    runningFlushOperations.put(id, operation);
+                    executor.submit(operation);
+                }
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
new file mode 100644
index 0000000..9743afe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+
+public class LSMIOOperationTask<T> extends FutureTask<T> {
+    private final ILSMIOOperation operation;
+
+    public LSMIOOperationTask(Callable<T> callable) {
+        super(callable);
+        this.operation = (ILSMIOOperation) callable;
+    }
+
+    public ILSMIOOperation getOperation() {
+        return operation;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 0600754..93c5a58 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -438,7 +438,7 @@
         ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
                 new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
                         .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
-                componentFileRefs.getBloomFilterFileReference(), callback));
+                componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -543,7 +543,7 @@
         ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ctx);
         ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
                 relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
-                relMergeFileRefs.getBloomFilterFileReference(), callback));
+                relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 45433e7..beb7643 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -28,23 +28,25 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMInvertedIndexFlushOperation implements ILSMIOOperation {
+public class LSMInvertedIndexFlushOperation implements ILSMIOOperation, Comparable<LSMInvertedIndexFlushOperation> {
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
     private final FileReference dictBTreeFlushTarget;
     private final FileReference deletedKeysBTreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference dictBTreeFlushTarget, FileReference deletedKeysBTreeFlushTarget,
-            FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.dictBTreeFlushTarget = dictBTreeFlushTarget;
         this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -88,4 +90,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMInvertedIndexFlushOperation o) {
+        return dictBTreeFlushTarget.getFile().getName().compareTo(o.getDictBTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 7cd921a..5a1e413 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -38,10 +38,11 @@
     private final FileReference deletedKeysBTreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMInvertedIndexMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             IIndexCursor cursor, FileReference dictBTreeMergeTarget, FileReference deletedKeysBTreeMergeTarget,
-            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
@@ -49,6 +50,7 @@
         this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -103,4 +105,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 662aa02..3d2cc62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -177,7 +177,7 @@
         LSMRTreeAccessor accessor = new LSMRTreeAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
                 .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
-                .getBloomFilterFileReference(), callback));
+                .getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -295,7 +295,8 @@
         ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
                 mergingComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs
-                        .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+                        .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback,
+                fileManager.getBaseDir()));
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 18d7a7e..0d40eb4 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -27,7 +27,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMRTreeFlushOperation implements ILSMIOOperation {
+public class LSMRTreeFlushOperation implements ILSMIOOperation, Comparable<LSMRTreeFlushOperation> {
 
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
@@ -35,16 +35,18 @@
     private final FileReference btreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference rtreeFlushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget,
-            ILSMIOOperationCallback callback) {
+            ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.rtreeFlushTarget = rtreeFlushTarget;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -89,4 +91,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMRTreeFlushOperation o) {
+        return rtreeFlushTarget.getFile().getName().compareTo(o.getRTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index da7a2fb..ddf3ebf 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -36,10 +36,11 @@
     private final FileReference btreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMRTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             ITreeIndexCursor cursor, FileReference rtreeMergeTarget, FileReference btreeMergeTarget,
-            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
@@ -47,6 +48,7 @@
         this.btreeMergeTarget = btreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -104,4 +106,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index ec66a39a..bfd0260 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -155,7 +155,7 @@
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
-                .getInsertIndexFileReference(), null, null, callback));
+                .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -256,7 +256,7 @@
         LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), null, null, callback));
+                .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3591f78a..fb06a7e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -20,10 +20,12 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -37,9 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -50,8 +52,8 @@
 
 public class LSMTreeRunner implements IExperimentRunner {
 
-    private static final int MAX_OPEN_FILES = 10000;
-    private static final int HYRACKS_FRAME_SIZE = 128;
+    private static final int MAX_OPEN_FILES = Integer.MAX_VALUE;
+    private static final int HYRACKS_FRAME_SIZE = 131072;
 
     protected IHyracksTaskContext ctx;
     protected IOManager ioManager;
@@ -61,7 +63,7 @@
 
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     protected final static String sep = System.getProperty("file.separator");
-    protected final static String classDir = "/lsmtree/";
+    protected final static String classDir = "/tmp/lsmtree/";
     protected String onDiskDir;
     protected FileReference file;
 
@@ -71,6 +73,11 @@
     protected IBufferCache memBufferCache;
     private final int onDiskPageSize;
     private final int onDiskNumPages;
+    private final static ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public LSMTreeRunner(int numBatches, int inMemPageSize, int inMemNumPages, int onDiskPageSize, int onDiskNumPages,
             ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields,
@@ -93,11 +100,13 @@
         List<IVirtualBufferCache> virtualBufferCaches = new ArrayList<IVirtualBufferCache>();
         for (int i = 0; i < 2; i++) {
             IVirtualBufferCache virtualBufferCache = new VirtualBufferCache(new HeapBufferAllocator(), inMemPageSize,
-                    inMemNumPages);
+                    inMemNumPages / 2);
             virtualBufferCaches.add(virtualBufferCache);
         }
 
-        this.ioScheduler = SynchronousScheduler.INSTANCE;
+        this.ioScheduler = AsynchronousScheduler.INSTANCE;
+        AsynchronousScheduler.INSTANCE.init(threadFactory);
+
         lsmtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fmp, typeTraits, cmpFactories,
                 bloomFilterKeyFields, bloomFilterFalsePositiveRate, NoMergePolicy.INSTANCE,
                 new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallback.INSTANCE);
@@ -133,15 +142,23 @@
 
     @Override
     public void reset() throws Exception {
+        try {
+            lsmtree.deactivate();
+        } catch (HyracksDataException e) {
+            // ignore
+        }
+        try {
+            lsmtree.destroy();
+        } catch (HyracksDataException e) {
+            // ignore
+        }
+
         lsmtree.create();
+        lsmtree.activate();
     }
 
     @Override
     public void deinit() throws Exception {
-        bufferCache.closeFile(lsmtreeFileId);
-        bufferCache.close();
-        memBufferCache.closeFile(lsmtreeFileId);
-        memBufferCache.close();
     }
 
     public class LSMTreeThread extends Thread {
@@ -166,6 +183,7 @@
                         } catch (TreeIndexException e) {
                         }
                     }
+                    dataGen.releaseBatch(batch);
                 }
             } catch (Exception e) {
                 e.printStackTrace();
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
index 641362e..2354550 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
@@ -31,18 +31,20 @@
     public static void main(String[] args) throws Exception {
         // Disable logging so we can better see the output times.
         Enumeration<String> loggers = LogManager.getLogManager().getLoggerNames();
-        while(loggers.hasMoreElements()) {
+        while (loggers.hasMoreElements()) {
             String loggerName = loggers.nextElement();
             Logger logger = LogManager.getLogManager().getLogger(loggerName);
             logger.setLevel(Level.OFF);
         }
-        
-        int numTuples = 100000; // 100K
+        boolean sorted = Boolean.parseBoolean(args[0]);
+        int numThreads =  Integer.parseInt(args[1]);
+
+        //int numTuples = 100000; // 100K
         //int numTuples = 1000000; // 1M
         //int numTuples = 2000000; // 2M
         //int numTuples = 3000000; // 3M
         //int numTuples = 10000000; // 10M
-        //int numTuples = 20000000; // 20M
+        int numTuples = 20000000; // 20M
         //int numTuples = 30000000; // 30M
         //int numTuples = 40000000; // 40M
         //int numTuples = 60000000; // 60M
@@ -50,27 +52,41 @@
         //int numTuples = 200000000; // 200M
         int batchSize = 10000;
         int numBatches = numTuples / batchSize;
-        
+
+        int payLoadSize = 240;
         ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
-        ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, 30);
-        
-        IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, fieldSerdes.length);
-        
+        ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, payLoadSize);
+
+        IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
+                fieldSerdes.length);
+        int[] bloomFilterKeyFields = new int[cmpFactories.length];
+        for (int i = 0; i < bloomFilterKeyFields.length; i++) {
+            bloomFilterKeyFields[i] = i;
+        }
+        double bloomFilterFalsePositiveRate = 0.01;
+
         //int repeats = 1000;
         int repeats = 1;
         long[] times = new long[repeats];
 
-        int numThreads = 2;
+//        int numThreads = 4;
+//        boolean sorted = true;
         for (int i = 0; i < repeats; i++) {
             //ConcurrentSkipListRunner runner = new ConcurrentSkipListRunner(numBatches, batchSize, tupleSize, typeTraits, cmp);
-            InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
+            //InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
             //BTreeBulkLoadRunner runner = new BTreeBulkLoadRunner(numBatches, 8192, 100000, typeTraits, cmp, 1.0f);
-        	//BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
-        	//String btreeName = "071211";
-        	//BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
-        	//LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
-        	//LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp); 
-            DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, 30, 50, 10, false);
+            //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
+            //String btreeName = "071211";
+            //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
+            //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
+            //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp); 
+            int inMemPageSize = 131072; // 128kb
+            int onDiskPageSize = inMemPageSize;
+            int inMemNumPages = 8192; // 1GB
+            int onDiskNumPages = 16384; // 2GB
+            LSMTreeRunner runner = new LSMTreeRunner(numBatches, inMemPageSize, inMemNumPages, onDiskPageSize,
+                    onDiskNumPages, typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate);
+            DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, payLoadSize, 50, 10, sorted);
             dataGen.start();
             runner.reset();
             times[i] = runner.runExperiment(dataGen, numThreads);