Fixed a deadlock when stopping the dataset life cycle manager.
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index ba80c29..fcb0786 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -214,35 +214,7 @@
             if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
                     && dsInfo.isOpen) {
 
-                // First wait for any ongoing IO operations
-                while (dsInfo.numActiveIOOps > 0) {
-                    try {
-                        wait();
-                    } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-
-                for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                    // TODO: This is not efficient since we flush the indexes sequentially. 
-                    // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
-                    // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
-                    flushAndWaitForIO(dsInfo, iInfo);
-                }
-
-                for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                    if (iInfo.isOpen) {
-                        iInfo.index.deactivate(false);
-                        iInfo.isOpen = false;
-                    }
-                    assert iInfo.referenceCount == 0;
-                }
-                dsInfo.isOpen = false;
-
-                List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
-                for (IVirtualBufferCache vbc : vbcs) {
-                    used -= vbc.getNumPages() * vbc.getPageSize();
-                }
+                closeDataset(dsInfo);
                 return true;
 
             }
@@ -437,6 +409,40 @@
 
     @Override
     public synchronized void start() {
+        used = 0;
+    }
+
+    private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
+        // First wait for any ongoing IO operations
+        while (dsInfo.numActiveIOOps > 0) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            // TODO: This is not efficient since we flush the indexes sequentially. 
+            // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
+            // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+            flushAndWaitForIO(dsInfo, iInfo);
+        }
+
+        for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            if (iInfo.isOpen) {
+                iInfo.index.deactivate(false);
+                iInfo.isOpen = false;
+            }
+            assert iInfo.referenceCount == 0;
+        }
+        dsInfo.isOpen = false;
+
+        List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+        for (IVirtualBufferCache vbc : vbcs) {
+            used -= vbc.getNumPages() * vbc.getPageSize();
+        }
+
     }
 
     @Override
@@ -445,10 +451,10 @@
             dumpState(outputStream);
         }
 
-        List<IIndex> openIndexes = getOpenIndexes();
-        for (IIndex index : openIndexes) {
-            index.deactivate();
+        for (DatasetInfo dsInfo : datasetInfos.values()) {
+            closeDataset(dsInfo);
         }
+
         datasetVirtualBufferCaches.clear();
         datasetOpTrackers.clear();
         datasetInfos.clear();