Fix for ASTERIXDB-1247

It seems like the root of this is the testing harness closing the index/deleting the file before it has
had a chance to flush all of its pages. There are also some changes to cover potential
corner cases where confiscated pages could be lost, but this doesn't seem to directly
affect the bug.

Change-Id: Ia580242b3f7753fc2f793f879332de3270ee3fee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/575
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 7099041..08f98a3 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -959,7 +959,6 @@
     public class BTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
         protected final ISplitKey splitKey;
         protected final boolean verifyInput;
-        protected List<ICachedPage> pagesToWrite;
 
         public BTreeBulkLoader(float fillFactor, boolean verifyInput, boolean appendOnly) throws TreeIndexException,
                 HyracksDataException {
@@ -967,7 +966,6 @@
             this.verifyInput = verifyInput;
             splitKey = new BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
             splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
-            pagesToWrite = new ArrayList<ICachedPage>();
         }
 
         @Override
@@ -1003,15 +1001,16 @@
                     splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer(), 0);
                     splitKey.setLeftPage(leafFrontier.pageId);
 
-                    pagesToWrite.clear();
                     propagateBulk(1, pagesToWrite);
                     leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
 
                     ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
+
                     queue.put(leafFrontier.page);
                     for (ICachedPage c : pagesToWrite) {
                         queue.put(c);
                     }
+                    pagesToWrite.clear();
 
                     splitKey.setRightPage(leafFrontier.pageId);
                     leafFrontier.page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
@@ -1027,13 +1026,7 @@
 
                 leafFrame.setPage(leafFrontier.page);
                 ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
-            } catch (IndexException e) {
-                handleException();
-                throw e;
-            } catch (HyracksDataException e) {
-                handleException();
-                throw e;
-            } catch (RuntimeException e) {
+            } catch (IndexException | HyracksDataException | RuntimeException e) {
                 handleException();
                 throw e;
             }
@@ -1124,14 +1117,14 @@
         }
 
         @Override
-        protected void handleException() throws HyracksDataException {
-            super.handleException();
-        }
-
-        @Override
         public void end() throws HyracksDataException {
-            persistFrontiers(0, -1);
-            super.end();
+            try{
+                persistFrontiers(0, -1);
+                super.end();
+            } catch ( HyracksDataException | RuntimeException e) {
+                handleException();
+                throw e;
+            }
         }
 
         @Override
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 015404d..58ca2d5 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.storage.am.common.impls;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -317,6 +318,7 @@
         protected boolean releasedLatches;
         public boolean appendOnly = false;
         protected final IFIFOPageQueue queue;
+        protected List<ICachedPage> pagesToWrite;
 
         public AbstractTreeIndexBulkLoader(float fillFactor, boolean appendOnly) throws TreeIndexException,
                 HyracksDataException {
@@ -346,8 +348,8 @@
 
             NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
             leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
-            leafFrontier.page = bufferCache.confiscatePage(BufferedFileHandle
-                    .getDiskPageId(fileId, leafFrontier.pageId));
+            leafFrontier.page = bufferCache.confiscatePage(
+                    BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
 
             interiorFrame.setPage(leafFrontier.page);
             interiorFrame.initBuffer((byte) 0);
@@ -359,6 +361,7 @@
             slotSize = leafFrame.getSlotSize();
 
             nodeFrontiers.add(leafFrontier);
+            pagesToWrite = new ArrayList<>();
         }
 
         public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
@@ -371,6 +374,9 @@
                     bufferCache.returnPage(frontierPage,false);
                 }
             }
+            for(ICachedPage pageToDiscard: pagesToWrite){
+                bufferCache.returnPage(pageToDiscard, false);
+            }
             releasedLatches = true;
         }
 
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index ae57071..d01bad2 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -883,7 +883,6 @@
         ITreeIndexTupleReference mbrTuple = interiorFrame.createTupleReference();
         ByteBuffer mbr;
         List<Integer> prevNodeFrontierPages = new ArrayList<Integer>();
-        List<ICachedPage> pagesToWrite = new ArrayList<ICachedPage>();
 
         public RTreeBulkLoader(float fillFactor, boolean appendOnly) throws TreeIndexException, HyracksDataException {
             super(fillFactor, appendOnly);
@@ -918,7 +917,6 @@
                     } else {
                         prevNodeFrontierPages.set(0, leafFrontier.pageId);
                     }
-                    pagesToWrite.clear();
                     propagateBulk(1, false, pagesToWrite);
 
                     leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
@@ -926,6 +924,8 @@
                     for (ICachedPage c : pagesToWrite) {
                         queue.put(c);
                     }
+
+                    pagesToWrite.clear();
                     leafFrontier.page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
                             leafFrontier.pageId));
                     leafFrame.setPage(leafFrontier.page);
@@ -1082,4 +1082,4 @@
     public void validate() throws HyracksDataException {
         throw new UnsupportedOperationException("Validation not implemented for R-Trees.");
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index f66726b..75adfc9 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -546,7 +546,7 @@
         }
     }
 
-    private void write(CachedPage cPage) throws HyracksDataException {
+    void write(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
         //synchronize on fInfo to prevent the file handle from being deleted until the page is written.
         synchronized (fInfo) {
@@ -1196,5 +1196,4 @@
                 }
         }
     }
-
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 2f66b15..b5d0ac7 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -30,25 +30,20 @@
     }
 
     @Override
-    public void write(ICachedPage page, IBufferCache ibufferCache) throws HyracksDataException {
-        BufferCache bufferCache = (BufferCache)ibufferCache;
+    public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException {
         CachedPage cPage = (CachedPage)page;
-        BufferedFileHandle fInfo = bufferCache.getFileInfo(cPage);
-        if (fInfo.fileHasBeenDeleted()) {
-            return;
+        try {
+            bufferCache.write(cPage);
+        } finally {
+            bufferCache.returnPage(cPage);
+            if (DEBUG) {
+                System.out.println("[FIFO] Return page: " + cPage.cpid + "," + cPage.dpid);
+            }
         }
-        cPage.buffer.position(0);
-        cPage.buffer.limit(bufferCache.getPageSize());
-        bufferCache.ioManager.syncWrite(fInfo.getFileHandle(), (long) BufferedFileHandle.getPageId(cPage.dpid) * bufferCache.getPageSize(),
-                cPage.buffer);
-        bufferCache.returnPage(cPage);
-        if(DEBUG) System.out.println("[FIFO] Return page: " + cPage.cpid + "," + cPage.dpid);
     }
     
     @Override
-    public void sync(int fileId, IBufferCache ibufferCache) throws HyracksDataException {
-        BufferCache bufferCache = (BufferCache)ibufferCache;
-        BufferedFileHandle fInfo = bufferCache.getFileInfo(fileId);
-        bufferCache.ioManager.sync(fInfo.getFileHandle(), true);
+    public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException {
+        bufferCache.force(fileId,true);
     }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
index 2281a44..7380261 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
@@ -19,7 +19,7 @@
 
 
 public interface IFIFOPageWriter {
-    public void write(ICachedPage page, IBufferCache bufferCache) throws HyracksDataException;
+    public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException;
 
-    void sync(int fileId, IBufferCache ibufferCache) throws HyracksDataException;
+    void sync(int fileId, BufferCache bufferCache) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index 8c01105..94606ce 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -25,6 +25,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.junit.Test;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -723,6 +724,7 @@
                         fail("Unexpected exception: " + e.getMessage());
                     }
                     // Success.
+
                     break;
                 } catch (TreeIndexDuplicateKeyException e2) {
                     if (j != i) {
@@ -732,7 +734,6 @@
                     break;
                 }
             }
-
             treeIndex.deactivate();
             treeIndex.destroy();
         }