1. Fix the "writerCount!=0 during component flushing" issue
2. Fix the duplicate LSM disk component file name issue by avoiding duplicate timestamps for different components.

Note that this change includes https://asterix-gerrit.ics.uci.edu/#/c/268/.

Change-Id: I805eab33603f52e19a1b76f1c315f9b75b6e3c03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/278
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 7346d9c..042b2c9 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -50,4 +50,9 @@
     public long getComponentSize() {
         return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
     }
+
+    @Override
+    public int getFileReferenceCount() {
+        return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 2f39e70..41a49b5 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -49,8 +48,7 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null,
@@ -70,6 +68,7 @@
     }
 
     private static FilenameFilter btreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(BTREE_STRING);
         }
@@ -168,8 +167,7 @@
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         // Create transaction lock file
         Files.createFile(Paths.get(baseDir + TRANSACTION_PREFIX + ts));
 
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 85063de..54d7cc6 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -21,45 +21,49 @@
 
 public class LSMBTreeWithBuddyDiskComponent extends AbstractDiskLSMComponent {
 
-	private final BTree btree;
-	private final BTree buddyBtree;
-	private final BloomFilter bloomFilter;
+    private final BTree btree;
+    private final BTree buddyBtree;
+    private final BloomFilter bloomFilter;
 
-	public LSMBTreeWithBuddyDiskComponent(BTree btree, BTree buddyBtree,
-			BloomFilter bloomFilter) {
-		this.btree = btree;
-		this.buddyBtree = buddyBtree;
-		this.bloomFilter = bloomFilter;
-	}
+    public LSMBTreeWithBuddyDiskComponent(BTree btree, BTree buddyBtree, BloomFilter bloomFilter) {
+        this.btree = btree;
+        this.buddyBtree = buddyBtree;
+        this.bloomFilter = bloomFilter;
+    }
 
-	@Override
-	protected void destroy() throws HyracksDataException {
-		btree.deactivate();
-		btree.destroy();
-		buddyBtree.deactivate();
-		buddyBtree.destroy();
-		bloomFilter.deactivate();
-		bloomFilter.destroy();
-	}
+    @Override
+    protected void destroy() throws HyracksDataException {
+        btree.deactivate();
+        btree.destroy();
+        buddyBtree.deactivate();
+        buddyBtree.destroy();
+        bloomFilter.deactivate();
+        bloomFilter.destroy();
+    }
 
-	public BTree getBTree() {
-		return btree;
-	}
+    public BTree getBTree() {
+        return btree;
+    }
 
-	public BTree getBuddyBTree() {
-		return buddyBtree;
-	}
+    public BTree getBuddyBTree() {
+        return buddyBtree;
+    }
 
-	public BloomFilter getBloomFilter() {
-		return bloomFilter;
-	}
+    public BloomFilter getBloomFilter() {
+        return bloomFilter;
+    }
 
-	@Override
-	public long getComponentSize() {
-		long size = btree.getFileReference().getFile().length();
-		size += buddyBtree.getFileReference().getFile().length();
-		size += bloomFilter.getFileReference().getFile().length();
-		return size;
-	}
+    @Override
+    public long getComponentSize() {
+        long size = btree.getFileReference().getFile().length();
+        size += buddyBtree.getFileReference().getFile().length();
+        size += bloomFilter.getFileReference().getFile().length();
+        return size;
+    }
+
+    @Override
+    public int getFileReferenceCount() {
+        return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
+    }
 
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 51ee0fb..cb7dca9 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -21,7 +21,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -43,12 +42,14 @@
     private final TreeIndexFactory<? extends ITreeIndex> buddyBtreeFactory;
 
     private static FilenameFilter btreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(BTREE_STRING);
         }
     };
 
     private static FilenameFilter buddyBtreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(BUDDY_BTREE_STRING);
         }
@@ -64,8 +65,7 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING),
@@ -199,8 +199,7 @@
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         // Create transaction lock file
         Files.createFile(Paths.get(baseDir + TRANSACTION_PREFIX + ts));
 
@@ -215,8 +214,9 @@
         FilenameFilter transactionFilter;
         File dir = new File(baseDir);
         String[] files = dir.list(transactionFileNameFilter);
-        if (files.length == 0)
+        if (files.length == 0) {
             return null;
+        }
         if (files.length != 1) {
             throw new HyracksDataException("More than one transaction lock found:" + files.length);
         } else {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index c3695cc..be67611 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -59,16 +59,20 @@
 
     /**
      * Populates the context's component holder with a snapshot of the components involved in the operation.
-     * 
+     *
      * @param ctx
      *            - the operation's context
      * @throws HyracksDataException
      */
     public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
+    public List<ILSMComponent> getInactiveDiskComponents();
+
+    public void addInactiveDiskComponent(ILSMComponent diskComponent);
+
     /**
      * Persistent the LSM component
-     * 
+     *
      * @param lsmComponent
      *            , the component to be persistent
      * @throws HyracksDataException
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index 96c669d..8c1d826 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -42,8 +42,8 @@
                 break;
             case MERGE:
                 if (state == ComponentState.READABLE_MERGING) {
-                    // This should never happen unless there are two concurrent merges that were scheduled 
-                    // concurrently and they have interleaving components to be merged. 
+                    // This should never happen unless there are two concurrent merges that were scheduled
+                    // concurrently and they have interleaving components to be merged.
                     // This should be handled properly by the merge policy, but we guard against that here anyway.
                     return false;
                 }
@@ -97,4 +97,6 @@
 
     public abstract long getComponentSize();
 
+    public abstract int getFileReferenceCount();
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 670f8dd..a14e3a7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,16 +46,17 @@
     protected final ILSMIOOperationScheduler ioScheduler;
     protected final ILSMIOOperationCallback ioOpCallback;
 
-    // In-memory components.   
+    // In-memory components.
     protected final List<ILSMComponent> memoryComponents;
     protected final List<IVirtualBufferCache> virtualBufferCaches;
     protected AtomicInteger currentMutableComponentId;
 
-    // On-disk components.    
+    // On-disk components.
     protected final IBufferCache diskBufferCache;
     protected final ILSMIndexFileManager fileManager;
     protected final IFileMapProvider diskFileMapProvider;
     protected final List<ILSMComponent> diskComponents;
+    protected final List<ILSMComponent> inactiveDiskComponents;
     protected final double bloomFilterFalsePositiveRate;
     protected final ILSMComponentFilterFrameFactory filterFrameFactory;
     protected final LSMComponentFilterManager filterManager;
@@ -82,6 +83,7 @@
         this.filterFrameFactory = filterFrameFactory;
         this.filterManager = filterManager;
         this.filterFields = filterFields;
+        this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
         this.durable = durable;
         lsmHarness = new LSMHarness(this, mergePolicy, opTracker);
         isActivated = false;
@@ -109,6 +111,7 @@
         lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker);
         isActivated = false;
         diskComponents = new LinkedList<ILSMComponent>();
+        this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
         // Memory related objects are nulled
         this.virtualBufferCaches = null;
         memoryComponents = null;
@@ -122,7 +125,7 @@
     protected void forceFlushDirtyPages(ITreeIndex treeIndex) throws HyracksDataException {
         int fileId = treeIndex.getFileId();
         IBufferCache bufferCache = treeIndex.getBufferCache();
-        // Flush all dirty pages of the tree. 
+        // Flush all dirty pages of the tree.
         // By default, metadata and data are flushed asynchronously in the buffercache.
         // This means that the flush issues writes to the OS, but the data may still lie in filesystem buffers.
         ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
@@ -168,7 +171,7 @@
             bufferCache.unpin(metadataPage);
         }
 
-        // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page 
+        // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
         // won't be flushed to disk because it won't be dirty until the write latch has been released.
         metadataPage = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId));
         if (metadataPage != null) {
@@ -276,4 +279,18 @@
     public ComponentState getCurrentMutableComponentState() {
         return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getState();
     }
+
+    public int getCurrentMutableComponentWriterCount() {
+        return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getWriterCount();
+    }
+
+    @Override
+    public List<ILSMComponent> getInactiveDiskComponents() {
+        return inactiveDiskComponents;
+    }
+
+    @Override
+    public void addInactiveDiskComponent(ILSMComponent diskComponent) {
+        inactiveDiskComponents.add(diskComponent);
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 1676560..4a771ea 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,9 +53,10 @@
     protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
     protected final Comparator<String> cmp = new FileNameComparator();
     protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
-
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
 
+    private String prevTimestamp = null;
+
     public AbstractLSMIndexFileManager(IFileMapProvider fileMapProvider, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeFactory) {
         this.baseDir = file.getFile().getPath();
@@ -67,6 +68,7 @@
     }
 
     private static FilenameFilter fileNameFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".");
         }
@@ -96,7 +98,7 @@
 
     protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
             TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
-            throws HyracksDataException, IndexException {
+                    throws HyracksDataException, IndexException {
         File dir = new File(baseDir);
         String[] files = dir.list(filter);
         for (String fileName : files) {
@@ -149,6 +151,7 @@
     }
 
     protected static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
         }
@@ -164,8 +167,7 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
     }
@@ -215,8 +217,8 @@
                 last = current;
             } else if (current.interval[0].compareTo(last.interval[0]) >= 0
                     && current.interval[1].compareTo(last.interval[1]) <= 0) {
-                // The current file is completely contained in the interval of the 
-                // last file. Thus the last file must contain at least as much information 
+                // The current file is completely contained in the interval of the
+                // last file. Thus the last file must contain at least as much information
                 // as the current file, so delete the current file.
                 current.fileRef.delete();
             } else {
@@ -260,6 +262,7 @@
         return baseDir;
     }
 
+    @Override
     public void recoverTransaction() throws HyracksDataException {
         File dir = new File(baseDir);
         String[] files = dir.list(transactionFileNameFilter);
@@ -354,12 +357,14 @@
     }
 
     protected static FilenameFilter transactionFileNameFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return name.startsWith(".T");
         }
     };
 
     protected static FilenameFilter dummyFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return true;
         }
@@ -369,6 +374,7 @@
         final String timeStamp = transactionFileName.substring(transactionFileName.indexOf(TRANSACTION_PREFIX)
                 + TRANSACTION_PREFIX.length());
         return new FilenameFilter() {
+            @Override
             public boolean accept(File dir, String name) {
                 if (inclusive) {
                     return name.startsWith(timeStamp);
@@ -397,4 +403,27 @@
             }
         };
     }
+
+    /**
+     * @return The string format of the current timestamp.
+     *         The returned results of this method are guaranteed to not have duplicates.
+     */
+    protected String getCurrentTimestamp() {
+        Date date = new Date();
+        String ts = formatter.format(date);
+        /**
+         * prevent a corner case where the same timestamp can be given.
+         */
+        while (prevTimestamp != null && ts.compareTo(prevTimestamp) == 0) {
+            try {
+                Thread.sleep(1);
+                date = new Date();
+                ts = formatter.format(date);
+            } catch (InterruptedException e) {
+                //ignore
+            }
+        }
+        prevTimestamp = ts;
+        return ts;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index 91e0ad0..1d93331 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -84,7 +84,7 @@
                 break;
             case SEARCH:
                 if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE
-                        || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+                || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
                     readerCount++;
                 } else {
                     return false;
@@ -195,4 +195,8 @@
             filter.reset();
         }
     }
+
+    public int getWriterCount() {
+        return writerCount;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a52b9d9..d861404 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
@@ -64,8 +65,26 @@
                         if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
                             //The mutable component has not been modified by any writer. There is nothing to flush.
                             //since the component is empty, set its state back to READABLE_WRITABLE
-                            ((AbstractLSMIndex) lsmIndex)
-                                    .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
+                            if (((AbstractLSMIndex) lsmIndex).getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
+                                ((AbstractLSMIndex) lsmIndex)
+                                        .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
+                            }
+                            return false;
+                        }
+                        if (((AbstractMemoryLSMComponent) flushingComponent).getWriterCount() > 0) {
+                            /*
+                             * This case is a case where even though FLUSH log was flushed to disk and scheduleFlush is triggered,
+                             * the current in-memory component (whose state was changed to READABLE_WRITABLE (RW)
+                             * from READABLE_UNWRITABLE(RU) before FLUSH log was written to log tail (which is memory buffer of log file)
+                             * and then the state was changed back to RW (as shown in the following scenario)) can have writers
+                             * based on the current code base/design.
+                             * Thus, the writer count of the component may be greater than 0.
+                             * if this happens, intead of throwing exception, scheduleFlush() deal with this situation by not flushing
+                             * the component.
+                             * Please see issue 884 for more detail information:
+                             * https://code.google.com/p/asterixdb/issues/detail?id=884&q=owner%3Akisskys%40gmail.com&colspec=ID%20Type%20Status%20Priority%20Milestone%20Owner%20Summary%20ETA%20Severity
+                             *
+                             */
                             return false;
                         }
                         break;
@@ -84,7 +103,7 @@
                 }
                 try {
                     // Flush and merge operations should never reach this wait call, because they are always try operations.
-                    // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on 
+                    // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on
                     // the same components, so they should not proceed.
                     opTracker.wait();
                 } catch (InterruptedException e) {
@@ -133,7 +152,7 @@
                 // Changing the flush status should *always* precede changing the mutable component.
                 lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
                 lsmIndex.changeMutableComponent();
-                // Notify all waiting threads whenever a flush has been scheduled since they will check 
+                // Notify all waiting threads whenever a flush has been scheduled since they will check
                 // again if they can grab and enter the mutable component.
                 opTracker.notifyAll();
                 break;
@@ -148,78 +167,118 @@
 
     private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
             boolean failedOperation) throws HyracksDataException, IndexException {
-        synchronized (opTracker) {
-            try {
-                int i = 0;
-                // First check if there is any action that is needed to be taken based on the state of each component.
-                for (ILSMComponent c : ctx.getComponentHolder()) {
-                    boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
-                    c.threadExit(opType, failedOperation, isMutableComponent);
-                    if (c.getType() == LSMComponentType.MEMORY) {
-                        switch (c.getState()) {
-                            case READABLE_UNWRITABLE:
-                                if (isMutableComponent
-                                        && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
-                                    lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
-                                }
-                                break;
-                            case INACTIVE:
-                                ((AbstractMemoryLSMComponent) c).reset();
-                                // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because
-                                // even though we switched the mutable components, it is possible that the component that we just switched
-                                // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush
-                                // is not enough. 
-                                opTracker.notifyAll();
-                                break;
-                            default:
-                                break;
+        List<ILSMComponent> inactiveDiskComponents = null;
+        List<ILSMComponent> inactiveDiskComponentsToBeDeleted = null;
+        try {
+            synchronized (opTracker) {
+                try {
+                    int i = 0;
+                    // First check if there is any action that is needed to be taken based on the state of each component.
+                    for (ILSMComponent c : ctx.getComponentHolder()) {
+                        boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
+                        c.threadExit(opType, failedOperation, isMutableComponent);
+                        if (c.getType() == LSMComponentType.MEMORY) {
+                            switch (c.getState()) {
+                                case READABLE_UNWRITABLE:
+                                    if (isMutableComponent
+                                            && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                                        lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+                                    }
+                                    break;
+                                case INACTIVE:
+                                    ((AbstractMemoryLSMComponent) c).reset();
+                                    // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because
+                                    // even though we switched the mutable components, it is possible that the component that we just switched
+                                    // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush
+                                    // is not enough.
+                                    opTracker.notifyAll();
+                                    break;
+                                default:
+                                    break;
+                            }
+                        } else {
+                            switch (c.getState()) {
+                                case INACTIVE:
+                                    lsmIndex.addInactiveDiskComponent(c);
+                                    break;
+                                default:
+                                    break;
+                            }
                         }
+                        i++;
+                    }
+                    // Then, perform any action that is needed to be taken based on the operation type.
+                    switch (opType) {
+                        case FLUSH:
+                            // newComponent is null if the flush op. was not performed.
+                            if (newComponent != null) {
+                                lsmIndex.addComponent(newComponent);
+                                mergePolicy.diskComponentAdded(lsmIndex, false);
+                            }
+                            break;
+                        case MERGE:
+                            // newComponent is null if the merge op. was not performed.
+                            if (newComponent != null) {
+                                lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+                                mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    if (failedOperation
+                            && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                        //When the operation failed, completeOperation() method must be called
+                        //in order to decrement active operation count which was incremented in beforeOperation() method.
+                        opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+                                ctx.getModificationCallback());
                     } else {
-                        switch (c.getState()) {
-                            case INACTIVE:
-                                ((AbstractDiskLSMComponent) c).destroy();
-                                break;
-                            default:
-                                break;
+                        opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+                                ctx.getModificationCallback());
+                    }
+
+                    /*
+                     * = Inactive disk components lazy cleanup if any =
+                     * Prepare to cleanup inactive diskComponents which were old merged components
+                     * and not anymore accessed.
+                     * This cleanup is done outside of optracker synchronized block.
+                     */
+                    inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
+                    if (!inactiveDiskComponents.isEmpty()) {
+                        for (ILSMComponent inactiveComp : inactiveDiskComponents) {
+                            if (((AbstractDiskLSMComponent) inactiveComp).getFileReferenceCount() == 1) {
+                                if (inactiveDiskComponentsToBeDeleted == null) {
+                                    inactiveDiskComponentsToBeDeleted = new LinkedList<ILSMComponent>();
+                                }
+                                inactiveDiskComponentsToBeDeleted.add(inactiveComp);
+                            }
+                        }
+                        if (inactiveDiskComponentsToBeDeleted != null) {
+                            inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
                         }
                     }
-                    i++;
                 }
-                // Then, perform any action that is needed to be taken based on the operation type.
-                switch (opType) {
-                    case FLUSH:
-                        // newComponent is null if the flush op. was not performed.
-                        if (newComponent != null) {
-                            lsmIndex.addComponent(newComponent);
-                            mergePolicy.diskComponentAdded(lsmIndex, false);
-                        }
-                        break;
-                    case MERGE:
-                        // newComponent is null if the merge op. was not performed.
-                        if (newComponent != null) {
-                            lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
-                            mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
-                        }
-                        break;
-                    default:
-                        break;
-                }
-            } catch (Throwable e) {
-                e.printStackTrace();
-                throw e;
-            } finally {
-                if (failedOperation
-                        && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
-                    //When the operation failed, completeOperation() method must be called 
-                    //in order to decrement active operation count which was incremented in beforeOperation() method.
-                    opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
-                            ctx.getModificationCallback());
-                } else {
-                    opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
-                            ctx.getModificationCallback());
+            }
+        } finally {
+            /*
+             * cleanup inactive disk components if any
+             */
+            if (inactiveDiskComponentsToBeDeleted != null) {
+                try {
+                    for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
+                        ((AbstractDiskLSMComponent) c).destroy();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    throw e;
                 }
             }
         }
+
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 8af857c..3c009c1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -141,4 +141,9 @@
         return null;
     }
 
+    @Override
+    public int getFileReferenceCount(int fileId) {
+        return 0;
+    }
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 18d5e8b..80b2897 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -341,6 +341,7 @@
         }
 
     }
+
     //These 4 methods aren't applicable here.
     @Override
     public int createMemFile() throws HyracksDataException {
@@ -365,4 +366,9 @@
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public int getFileReferenceCount(int fileId) {
+        return 0;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 6efefd6..ecc67f3 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,4 +65,9 @@
                 + deletedKeysBTree.getFileReference().getFile().length()
                 + bloomFilter.getFileReference().getFile().length();
     }
+
+    @Override
+    public int getFileReferenceCount() {
+        return deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 825e5d3..c4398b7 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,6 @@
 import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -43,18 +42,21 @@
     private final BTreeFactory btreeFactory;
 
     private static FilenameFilter dictBTreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(DICT_BTREE_SUFFIX);
         }
     };
 
     private static FilenameFilter invListFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(INVLISTS_SUFFIX);
         }
     };
 
     private static FilenameFilter deletedKeysBTreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(DELETED_KEYS_BTREE_SUFFIX);
         }
@@ -67,8 +69,7 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index ecf8e59..82f43e5 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,4 +66,9 @@
         }
         return size;
     }
+
+    @Override
+    public int getFileReferenceCount() {
+        return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index cca640f..7e3fb06 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -44,12 +43,14 @@
     private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
 
     private static FilenameFilter btreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(BTREE_STRING);
         }
     };
 
     private static FilenameFilter rtreeFilter = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             return !name.startsWith(".") && name.endsWith(RTREE_STRING);
         }
@@ -64,8 +65,7 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + RTREE_STRING),
@@ -195,8 +195,7 @@
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        Date date = new Date();
-        String ts = formatter.format(date);
+        String ts = getCurrentTimestamp();
         // Create transaction lock file
         Files.createFile(Paths.get(baseDir + TRANSACTION_PREFIX + ts));
 
@@ -211,8 +210,9 @@
         FilenameFilter transactionFilter;
         File dir = new File(baseDir);
         String[] files = dir.list(transactionFileNameFilter);
-        if (files.length == 0)
+        if (files.length == 0) {
             return null;
+        }
         if (files.length != 1) {
             throw new HyracksDataException("More than one transaction lock found:" + files.length);
         } else {
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 4248dc6..f5b5b17 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,10 +18,10 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.HashSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -169,12 +169,12 @@
 
     @Override
     /**
-     * Takes a virtual page, and copies it to a new page at the physical identifier. 
+     * Takes a virtual page, and copies it to a new page at the physical identifier.
      */
-    //TODO: I should not have to copy the page. I should just append it to the end of the hash bucket, but this is 
-    //safer/easier for now. 
+    //TODO: I should not have to copy the page. I should just append it to the end of the hash bucket, but this is
+    //safer/easier for now.
     public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
-        CachedPage virtPage = findPage(vpid, true); //should definitely succeed. 
+        CachedPage virtPage = findPage(vpid, true); //should definitely succeed.
         //pinSanityCheck(dpid); //debug
         ICachedPage realPage = pin(dpid, false);
         virtPage.acquireReadLatch();
@@ -366,7 +366,7 @@
         buffer.append("Buffer cache state\n");
         buffer.append("Page Size: ").append(pageSize).append('\n');
         buffer.append("Number of physical pages: ").append(pageReplacementStrategy.getMaxAllowedNumPages())
-                .append('\n');
+        .append('\n');
         buffer.append("Hash table size: ").append(pageMap.length).append('\n');
         buffer.append("Page Map:\n");
         int nCachedPages = 0;
@@ -379,10 +379,10 @@
                     buffer.append("   ").append(i).append('\n');
                     while (cp != null) {
                         buffer.append("      ").append(cp.cpid).append(" -> [")
-                                .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
-                                .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
-                                .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
-                                .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+                        .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+                        .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+                        .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                        .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
                         cp = cp.next;
                         ++nCachedPages;
                     }
@@ -761,6 +761,18 @@
     }
 
     @Override
+    public synchronized int getFileReferenceCount(int fileId) {
+        synchronized (fileInfoMap) {
+            BufferedFileHandle fInfo = fileInfoMap.get(fileId);
+            if (fInfo != null) {
+                return fInfo.getReferenceCount();
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    @Override
     public synchronized void deleteMemFile(int fileId) throws HyracksDataException {
         //TODO: possible sanity chcecking here like in above?
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -769,7 +781,7 @@
         synchronized (virtualFiles) {
             virtualFiles.remove(fileId);
         }
-        synchronized(fileInfoMap){
+        synchronized (fileInfoMap) {
             fileMapManager.unregisterMemFile(fileId);
         }
     }
@@ -792,8 +804,8 @@
         cachedPages.add(page);
     }
 
+    @Override
     public void dumpState(OutputStream os) throws IOException {
         os.write(dumpState().getBytes());
     }
-
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 743de15..3a88091 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -173,7 +173,7 @@
 
     @Override
     public void deleteMemFile(int fileId) throws HyracksDataException {
-        bufferCache.deleteMemFile(fileId); 
+        bufferCache.deleteMemFile(fileId);
     }
 
     @Override
@@ -188,4 +188,9 @@
         return bufferCache.unpinVirtual(vpid, dpid);
     }
 
+    @Override
+    public int getFileReferenceCount(int fileId) {
+        return bufferCache.getFileReferenceCount(fileId);
+    }
+
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index a8e17a5..478c641 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,12 +15,11 @@
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
 
 public interface IBufferCache {
     public void createFile(FileReference fileRef) throws HyracksDataException;
-    
+
     public int createMemFile() throws HyracksDataException;
 
     public void openFile(int fileId) throws HyracksDataException;
@@ -34,9 +33,9 @@
     public ICachedPage tryPin(long dpid) throws HyracksDataException;
 
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
-    
+
     public ICachedPage pinVirtual(long vpid) throws HyracksDataException;
-    
+
     public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException;
 
     public void unpin(ICachedPage page) throws HyracksDataException;
@@ -49,5 +48,7 @@
 
     public int getNumPages();
 
+    public int getFileReferenceCount(int fileId);
+
     public void close() throws HyracksDataException;
 }
\ No newline at end of file