Merge branch 'fullstack_lsm_staging' into salsubaiee/fullstack_lsm_staging_asterix_issue_367
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 2847004..81c2367 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -647,4 +647,9 @@
             btree.validate();
         }
     }
+    
+    @Override
+    public String toString() {
+        return "LSMBTree [" + fileManager.getBaseDir() + "]"; 
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e4dd369..12c7ea7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -190,4 +190,9 @@
     public IBufferCache getBufferCache() {
         return diskBufferCache;
     }
+
+    @Override
+    public String toString() {
+        return "LSMIndex [" + fileManager.getBaseDir() + "]";
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65e6a3d..7372b86 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,8 +16,9 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,6 +36,8 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 
 public class LSMHarness implements ILSMHarness {
+    private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+
     private final ILSMIndexInternal lsmIndex;
     private final ILSMMergePolicy mergePolicy;
     private final ILSMOperationTracker opTracker;
@@ -171,7 +174,7 @@
         }
 
         lsmIndex.setFlushStatus(false);
-        
+
         if (!lsmIndex.scheduleFlush(ctx, callback)) {
             callback.beforeOperation();
             callback.afterOperation(null, null);
@@ -184,7 +187,11 @@
     public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException {
         operation.getCallback().beforeOperation();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(lsmIndex + ": flushing");
+        }
         ILSMComponent newComponent = lsmIndex.flush(operation);
+        
         operation.getCallback().afterOperation(null, newComponent);
         lsmIndex.markAsValid(newComponent);
         operation.getCallback().afterFinalize(newComponent);
@@ -215,6 +222,9 @@
             IndexException {
         List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
         operation.getCallback().beforeOperation();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(lsmIndex + ": merging");
+        }
         ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
         ctx.getComponentHolder().addAll(mergedComponents);
         operation.getCallback().afterOperation(mergedComponents, newComponent);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 33052f3..30fdd27 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -566,6 +566,7 @@
     public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final IIndexBulkLoader invIndexBulkLoader;
+        private boolean exceptionCaught = false;
 
         public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
                 throws IndexException {
@@ -599,6 +600,7 @@
         }
 
         protected void handleException() throws HyracksDataException {
+            exceptionCaught = true;
             ((LSMInvertedIndexImmutableComponent) component).getInvIndex().deactivate();
             ((LSMInvertedIndexImmutableComponent) component).getInvIndex().destroy();
             ((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().deactivate();
@@ -609,7 +611,9 @@
 
         @Override
         public void end() throws IndexException, HyracksDataException {
-            invIndexBulkLoader.end();
+            if (!exceptionCaught) {
+                invIndexBulkLoader.end();
+            }
             lsmHarness.addBulkLoadedComponent(component);
         }
     }
@@ -732,4 +736,9 @@
             component.getDeletedKeysBTree().validate();
         }
     }
+    
+    @Override
+    public String toString() {
+        return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]"; 
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 9b377a9..4b2e075 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
@@ -55,6 +56,7 @@
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree.RTreeAccessor;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITreeIndex {
@@ -279,6 +281,8 @@
             throw new UnsupportedOperationException("Physical delete not supported in the LSM-RTree");
         }
 
+        ctx.modificationCallback.before(tuple);
+        ctx.modificationCallback.found(null, tuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
             // Before each insert, we must check whether there exist a killer
             // tuple in the memBTree. If we find a killer tuple, we must truly
@@ -323,13 +327,16 @@
     }
 
     protected LSMRTreeOpContext createOpContext(IModificationOperationCallback modCallback) {
-        return new LSMRTreeOpContext((RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(modCallback,
-                NoOpOperationCallback.INSTANCE), (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
+        RTreeAccessor rtreeAccessor = (RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(
+                NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeAccessor btreeAccessor = (BTree.BTreeAccessor) mutableComponent.getBTree().createAccessor(
+                NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        
+        return new LSMRTreeOpContext(rtreeAccessor, (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
                 (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
-                        .getMetaDataFrameFactory().createFrame(), 4, (BTree.BTreeAccessor) mutableComponent.getBTree()
-                        .createAccessor(modCallback, NoOpOperationCallback.INSTANCE), btreeLeafFrameFactory,
+                        .getMetaDataFrameFactory().createFrame(), 4, btreeAccessor, btreeLeafFrameFactory,
                 btreeInteriorFrameFactory, memFreePageManager.getMetaDataFrameFactory().createFrame(),
-                rtreeCmpFactories, btreeCmpFactories, null, null);
+                rtreeCmpFactories, btreeCmpFactories, modCallback, NoOpOperationCallback.INSTANCE);
     }
 
     @Override
@@ -355,4 +362,9 @@
         InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
         return memBufferCache.getNumPages() * memBufferCache.getPageSize();
     }
+    
+    @Override
+    public String toString() {
+        return "LSMRTree [" + fileManager.getBaseDir() + "]"; 
+    }
 }