Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 76f411b..f71ee1d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -17,7 +17,6 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -47,10 +46,9 @@
 
     private int dataFrameCount;
     private int[] tPointers;
+    private int[] tPointersTemp;
     private int tupleCount;
 
-    private Random rand = new Random();
-
     public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
@@ -119,8 +117,8 @@
             }
         }
         if (tupleCount > 0) {
-            shuffle(tPointers, 4, tupleCount);
-            sort(tPointers, 0, tupleCount);
+            tPointersTemp = new int[tPointers.length];
+            sort(0, tupleCount);
         }
     }
 
@@ -146,75 +144,73 @@
         }
     }
 
-    private void sort(int[] tPointers, int offset, int length) {
-        int m = offset + (length >> 1);
-        int mi = tPointers[m * 4];
-        int mj = tPointers[m * 4 + 1];
-        int mv = tPointers[m * 4 + 3];
-
-        int a = offset;
-        int b = a;
-        int c = offset + length - 1;
-        int d = c;
-        while (true) {
-            while (b <= c) {
-                int cmp = compare(tPointers, b, mi, mj, mv);
-                if (cmp > 0) {
-                    break;
+    private void sort(int offset, int length) {
+        int step = 1;
+        int len = length;
+        int end = offset + len;
+        /** bottom-up merge */
+        while (step < len) {
+            /** merge */
+            for (int i = offset; i < end; i += 2 * step) {
+                int next = i + step;
+                if (next < end) {
+                    merge(i, next, step, Math.min(step, end - next));
+                } else {
+                    System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
                 }
-                if (cmp == 0) {
-                    swap(tPointers, a++, b);
-                }
-                ++b;
             }
-            while (c >= b) {
-                int cmp = compare(tPointers, c, mi, mj, mv);
-                if (cmp < 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(tPointers, c, d--);
-                }
-                --c;
+            /** prepare next phase merge */
+            step *= 2;
+            int[] tmp = tPointersTemp;
+            tPointersTemp = tPointers;
+            tPointers = tmp;
+        }
+    }
+
+    /** Merge two subarrays into one*/
+    private void merge(int start1, int start2, int len1, int len2) {
+        int targetPos = start1;
+        int pos1 = start1;
+        int pos2 = start2;
+        int end1 = start1 + len1 - 1;
+        int end2 = start2 + len2 - 1;
+        while (pos1 <= end1 && pos2 <= end2) {
+            int cmp = compare(pos1, pos2);
+            if (cmp <= 0) {
+                copy(pos1, targetPos);
+                pos1++;
+            } else {
+                copy(pos2, targetPos);
+                pos2++;
             }
-            if (b > c)
-                break;
-            swap(tPointers, b++, c--);
+            targetPos++;
         }
-
-        int s;
-        int n = offset + length;
-        s = Math.min(a - offset, b - a);
-        vecswap(tPointers, offset, b - s, s);
-        s = Math.min(d - c, n - d - 1);
-        vecswap(tPointers, b, n - s, s);
-
-        if ((s = b - a) > 1) {
-            sort(tPointers, offset, s);
+        if (pos1 <= end1) {
+            int rest = end1 - pos1 + 1;
+            System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
         }
-        if ((s = d - c) > 1) {
-            sort(tPointers, n - s, s);
+        if (pos2 <= end2) {
+            int rest = end2 - pos2 + 1;
+            System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
         }
     }
 
-    private void swap(int x[], int a, int b) {
-        for (int i = 0; i < 4; ++i) {
-            int t = x[a * 4 + i];
-            x[a * 4 + i] = x[b * 4 + i];
-            x[b * 4 + i] = t;
-        }
+    private void copy(int src, int dest) {
+        tPointersTemp[dest * 4] = tPointers[src * 4];
+        tPointersTemp[dest * 4 + 1] = tPointers[src * 4 + 1];
+        tPointersTemp[dest * 4 + 2] = tPointers[src * 4 + 2];
+        tPointersTemp[dest * 4 + 3] = tPointers[src * 4 + 3];
     }
 
-    private void vecswap(int x[], int a, int b, int n) {
-        for (int i = 0; i < n; i++, a++, b++) {
-            swap(x, a, b);
-        }
-    }
-
-    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+    private int compare(int tp1, int tp2) {
         int i1 = tPointers[tp1 * 4];
         int j1 = tPointers[tp1 * 4 + 1];
         int v1 = tPointers[tp1 * 4 + 3];
+
+        int tp2i = tPointers[tp2 * 4];
+        int tp2j = tPointers[tp2 * 4 + 1];
+        int tp2v = tPointers[tp2 * 4 + 3];
+
         if (v1 != tp2v) {
             return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
         }
@@ -244,18 +240,6 @@
         return 0;
     }
 
-    private void shuffle(int[] tPointers, int interval, int tupleCount) {
-        for (int i = tupleCount; i > 1; i--) {
-            int next = rand.nextInt(i) * interval;
-            int target = (i - 1) * interval;
-            for (int j = 0; j < interval; j++) {
-                int drawn = tPointers[next + j];
-                tPointers[next + j] = tPointers[target + j];
-                tPointers[target + j] = drawn;
-            }
-        }
-    }
-
     public void close() {
         this.buffers.clear();
     }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
deleted file mode 100644
index 7237046..0000000
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
-
-public class BTreeNonExistentKeyException extends BTreeException {
-
-    private static final long serialVersionUID = 1L;
-
-    public BTreeNonExistentKeyException(Exception e) {
-        super(e);
-    }
-
-    public BTreeNonExistentKeyException(String message) {
-        super(message);
-    }
-}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index b503c8b..93cde3d 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -25,8 +25,6 @@
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.IPrefixSlotManager;
 import edu.uci.ics.hyracks.storage.am.btree.compressors.FieldPrefixCompressor;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
 import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixPrefixTupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixSlotManager;
@@ -37,6 +35,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -370,7 +370,7 @@
         int tupleIndex = slotManager.decodeSecondSlotField(slot);
         // Error indicator is set if there is an exact match.
         if (tupleIndex == slotManager.getErrorIndicator()) {
-            throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+            throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
         }
         return slot;
     }
@@ -382,7 +382,7 @@
         int tupleIndex = slotManager.decodeSecondSlotField(slot);
         // Error indicator is set if there is an exact match.
         if (tupleIndex == slotManager.getErrorIndicator()) {
-            throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+            throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
         }
         return slot;
     }
@@ -411,7 +411,7 @@
         int tupleIndex = slotManager.decodeSecondSlotField(slot);
         // Error indicator is set if there is no exact match.
         if (tupleIndex == slotManager.getErrorIndicator()) {
-            throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+            throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
         }
         return slot;
     }
@@ -423,7 +423,7 @@
         int tupleIndex = slotManager.decodeSecondSlotField(slot);
         // Error indicator is set if there is no exact match.
         if (tupleIndex == slotManager.getErrorIndicator()) {
-            throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+            throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
         }
         return slot;
     }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 1974989..187cd52 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -20,14 +20,14 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
 import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -72,7 +72,7 @@
                 FindTupleNoExactMatchPolicy.HIGHER_KEY);
         // Error indicator is set if there is an exact match.
         if (tupleIndex == slotManager.getErrorIndicator()) {
-            throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+            throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
         }
         return tupleIndex;
     }
@@ -83,7 +83,7 @@
                 FindTupleNoExactMatchPolicy.HIGHER_KEY);
         // Error indicator is set if there is no exact match.
         if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
-            throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+            throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
         }
         return tupleIndex;
     }
@@ -121,7 +121,7 @@
                 FindTupleNoExactMatchPolicy.HIGHER_KEY);
         // Error indicator is set if there is no exact match.
         if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
-            throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+            throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
         }
         return tupleIndex;
     }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 689843b..ff94040 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -33,7 +33,6 @@
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.api.ITupleAcceptor;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
@@ -53,6 +52,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -562,7 +563,7 @@
         // This means that there could be underflow, even an empty page that is
         // pointed to by an interior node.
         if (ctx.leafFrame.getTupleCount() == 0) {
-            throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+            throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
         }
         int tupleIndex = ctx.leafFrame.findDeleteTupleIndex(tuple);
         ITupleReference beforeTuple = ctx.leafFrame.getMatchingKeyTuple(tuple, tupleIndex);
@@ -1024,9 +1025,12 @@
         protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws IndexException,
                 HyracksDataException {
             // New tuple should be strictly greater than last tuple.
-            if (cmp.compare(tuple, prevTuple) <= 0) {
-                throw new UnsortedInputException(
-                        "Input stream given to BTree bulk load is not sorted or has duplicates.");
+            int cmpResult = cmp.compare(tuple, prevTuple);
+            if (cmpResult < 0) {
+                throw new UnsortedInputException("Input stream given to BTree bulk load is not sorted.");
+            }
+            if (cmpResult == 0) {
+                throw new TreeIndexDuplicateKeyException("Input stream given to BTree bulk load has duplicates.");
             }
         }
 
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4a9cfb4..092fada 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -30,6 +30,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -97,7 +99,11 @@
 
                 switch (op) {
                     case INSERT: {
-                        indexAccessor.insert(tuple);
+                        try {
+                            indexAccessor.insert(tuple);
+                        } catch (TreeIndexDuplicateKeyException e) {
+                            // ingnore that exception to allow inserting existing keys which becomes an NoOp
+                        }
                         break;
                     }
                     case UPDATE: {
@@ -109,7 +115,11 @@
                         break;
                     }
                     case DELETE: {
-                        indexAccessor.delete(tuple);
+                        try {
+                            indexAccessor.delete(tuple);
+                        } catch (TreeIndexNonExistentKeyException e) {
+                            // ingnore that exception to allow deletions of non-existing keys
+                        }
                         break;
                     }
                     default: {
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
similarity index 70%
rename from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
rename to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
index cde5022..1767504 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
@@ -13,16 +13,18 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
 
-public class BTreeDuplicateKeyException extends BTreeException {
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+public class TreeIndexDuplicateKeyException extends TreeIndexException {
     private static final long serialVersionUID = 1L;
 
-    public BTreeDuplicateKeyException(Exception e) {
+    public TreeIndexDuplicateKeyException(Exception e) {
         super(e);
     }
 
-    public BTreeDuplicateKeyException(String message) {
+    public TreeIndexDuplicateKeyException(String message) {
         super(message);
     }
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
similarity index 61%
copy from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
copy to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
index cde5022..8b62063 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
@@ -13,16 +13,19 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
 
-public class BTreeDuplicateKeyException extends BTreeException {
-    private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 
-    public BTreeDuplicateKeyException(Exception e) {
-        super(e);
-    }
+public class TreeIndexNonExistentKeyException extends TreeIndexException {
 
-    public BTreeDuplicateKeyException(String message) {
-        super(message);
-    }
+	private static final long serialVersionUID = 1L;
+
+	public TreeIndexNonExistentKeyException(Exception e) {
+		super(e);
+	}
+
+	public TreeIndexNonExistentKeyException(String message) {
+		super(message);
+	}
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 19d40a0..a85a174 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -12,352 +12,356 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-

-package edu.uci.ics.hyracks.storage.am.common.impls;

-

-import java.util.ArrayList;

-

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.io.FileReference;

-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;

-import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;

-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;

-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;

-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;

-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;

-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;

-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;

-import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;

-import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;

-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;

-

-public abstract class AbstractTreeIndex implements ITreeIndex {

-

-    protected final static int rootPage = 1;

-

-    protected final IBufferCache bufferCache;

-    protected final IFileMapProvider fileMapProvider;

-    protected final IFreePageManager freePageManager;

-

-    protected final ITreeIndexFrameFactory interiorFrameFactory;

-    protected final ITreeIndexFrameFactory leafFrameFactory;

-

-    protected final IBinaryComparatorFactory[] cmpFactories;

-    protected final int fieldCount;

-

-    protected FileReference file;

-    protected int fileId = -1;

-

-    private boolean isActivated = false;

-

-    public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,

-            IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,

-            ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,

-            FileReference file) {

-        this.bufferCache = bufferCache;

-        this.fileMapProvider = fileMapProvider;

-        this.freePageManager = freePageManager;

-        this.interiorFrameFactory = interiorFrameFactory;

-        this.leafFrameFactory = leafFrameFactory;

-        this.cmpFactories = cmpFactories;

-        this.fieldCount = fieldCount;

-        this.file = file;

-    }

-

-    public synchronized void create() throws HyracksDataException {

-        if (isActivated) {

-            throw new HyracksDataException("Failed to create the index since it is activated.");

-        }

-

-        boolean fileIsMapped = false;

-        synchronized (fileMapProvider) {

-            fileIsMapped = fileMapProvider.isMapped(file);

-            if (!fileIsMapped) {

-                bufferCache.createFile(file);

-            }

-            fileId = fileMapProvider.lookupFileId(file);

-            try {

-                // Also creates the file if it doesn't exist yet.

-                bufferCache.openFile(fileId);

-            } catch (HyracksDataException e) {

-                // Revert state of buffer cache since file failed to open.

-                if (!fileIsMapped) {

-                    bufferCache.deleteFile(fileId, false);

-                }

-                throw e;

-            }

-        }

-

-        freePageManager.open(fileId);

-        initEmptyTree();

-        freePageManager.close();

-        bufferCache.closeFile(fileId);

-    }

-

-    private void initEmptyTree() throws HyracksDataException {

-        ITreeIndexFrame frame = leafFrameFactory.createFrame();

-        ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();

-        freePageManager.init(metaFrame, rootPage);

-

-        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);

-        rootNode.acquireWriteLatch();

-        try {

-            frame.setPage(rootNode);

-            frame.initBuffer((byte) 0);

-        } finally {

-            rootNode.releaseWriteLatch();

-            bufferCache.unpin(rootNode);

-        }

-    }

-

-    public synchronized void activate() throws HyracksDataException {

-        if (isActivated) {

-            throw new HyracksDataException("Failed to activate the index since it is already activated.");

-        }

-

-        boolean fileIsMapped = false;

-        synchronized (fileMapProvider) {

-            fileIsMapped = fileMapProvider.isMapped(file);

-            if (!fileIsMapped) {

-                bufferCache.createFile(file);

-            }

-            fileId = fileMapProvider.lookupFileId(file);

-            try {

-                // Also creates the file if it doesn't exist yet.

-                bufferCache.openFile(fileId);

-            } catch (HyracksDataException e) {

-                // Revert state of buffer cache since file failed to open.

-                if (!fileIsMapped) {

-                    bufferCache.deleteFile(fileId, false);

-                }

-                throw e;

-            }

-        }

-        freePageManager.open(fileId);

-

-        // TODO: Should probably have some way to check that the tree is physically consistent

-        // or that the file we just opened actually is a tree

-

-        isActivated = true;

-    }

-

-    public synchronized void deactivate() throws HyracksDataException {

-        if (!isActivated) {

-            throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");

-        }

-

-        bufferCache.closeFile(fileId);

-        freePageManager.close();

-

-        isActivated = false;

-    }

-

-    public synchronized void destroy() throws HyracksDataException {

-        if (isActivated) {

-            throw new HyracksDataException("Failed to destroy the index since it is activated.");

-        }

-

-        file.delete();

-        if (fileId == -1) {

-            return;

-        }

-

-        bufferCache.deleteFile(fileId, false);

-        fileId = -1;

-    }

-

-    public synchronized void clear() throws HyracksDataException {

-        if (!isActivated) {

-            throw new HyracksDataException("Failed to clear the index since it is not activated.");

-        }

-        initEmptyTree();

-    }

-

-    public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {

-        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);

-        rootNode.acquireReadLatch();

-        try {

-            frame.setPage(rootNode);

-            if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {

-                return true;

-            } else {

-                return false;

-            }

-        } finally {

-            rootNode.releaseReadLatch();

-            bufferCache.unpin(rootNode);

-        }

-    }

-

-    public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {

-        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);

-        rootNode.acquireReadLatch();

-        try {

-            frame.setPage(rootNode);

-            return frame.getLevel();

-        } finally {

-            rootNode.releaseReadLatch();

-            bufferCache.unpin(rootNode);

-        }

-    }

-

-    public int getFileId() {

-        return fileId;

-    }

-

-    public FileReference getFileReference() {

-        return file;

-    }

-

-    public IBufferCache getBufferCache() {

-        return bufferCache;

-    }

-

-    public ITreeIndexFrameFactory getInteriorFrameFactory() {

-        return interiorFrameFactory;

-    }

-

-    public ITreeIndexFrameFactory getLeafFrameFactory() {

-        return leafFrameFactory;

-    }

-

-    public IBinaryComparatorFactory[] getComparatorFactories() {

-        return cmpFactories;

-    }

-

-    public IFreePageManager getFreePageManager() {

-        return freePageManager;

-    }

-

-    public int getRootPageId() {

-        return rootPage;

-    }

-

-    public int getFieldCount() {

-        return fieldCount;

-    }

-

-    public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {

-        protected final MultiComparator cmp;

-        protected final int slotSize;

-        protected final int leafMaxBytes;

-        protected final int interiorMaxBytes;

-        protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();

-        protected final ITreeIndexMetaDataFrame metaFrame;

-        protected final ITreeIndexTupleWriter tupleWriter;

-        protected ITreeIndexFrame leafFrame;

-        protected ITreeIndexFrame interiorFrame;

-

-        public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {

-            leafFrame = leafFrameFactory.createFrame();

-            interiorFrame = interiorFrameFactory.createFrame();

-            metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();

-

-            if (!isEmptyTree(leafFrame)) {

-                throw new TreeIndexException("Cannot bulk-load a non-empty tree.");

-            }

-

-            this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);

-

-            leafFrame.setMultiComparator(cmp);

-            interiorFrame.setMultiComparator(cmp);

-

-            tupleWriter = leafFrame.getTupleWriter();

-

-            NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());

-            leafFrontier.pageId = freePageManager.getFreePage(metaFrame);

-            leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);

-            leafFrontier.page.acquireWriteLatch();

-

-            interiorFrame.setPage(leafFrontier.page);

-            interiorFrame.initBuffer((byte) 0);

-            interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);

-

-            leafFrame.setPage(leafFrontier.page);

-            leafFrame.initBuffer((byte) 0);

-            leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);

-            slotSize = leafFrame.getSlotSize();

-

-            nodeFrontiers.add(leafFrontier);

-        }

-

-        public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;

-

-        protected void handleException() throws HyracksDataException {

-            // Unlatch and unpin pages.

-            for (NodeFrontier nodeFrontier : nodeFrontiers) {

-                nodeFrontier.page.releaseWriteLatch();

-                bufferCache.unpin(nodeFrontier.page);

-            }

-        }

-

-        @Override

-        public void end() throws HyracksDataException {

-            // copy the root generated from the bulk-load to *the* root page location

-            ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);

-            newRoot.acquireWriteLatch();

-            NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);

-            try {

-                System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,

-                        lastNodeFrontier.page.getBuffer().capacity());

-            } finally {

-                newRoot.releaseWriteLatch();

-                bufferCache.unpin(newRoot);

-

-                // register old root as a free page

-                freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);

-

-                for (int i = 0; i < nodeFrontiers.size(); i++) {

-                    nodeFrontiers.get(i).page.releaseWriteLatch();

-                    bufferCache.unpin(nodeFrontiers.get(i).page);

-                }

-            }

-        }

-

-        protected void addLevel() throws HyracksDataException {

-            NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());

-            frontier.pageId = freePageManager.getFreePage(metaFrame);

-            frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);

-            frontier.page.acquireWriteLatch();

-            frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());

-            interiorFrame.setPage(frontier.page);

-            interiorFrame.initBuffer((byte) nodeFrontiers.size());

-            nodeFrontiers.add(frontier);

-        }

-    }

-

-    public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {

-        ITreeIndexAccessor accessor;

-

-        public TreeIndexInsertBulkLoader() throws HyracksDataException {

-            accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,

-                    NoOpOperationCallback.INSTANCE);

-        }

-

-        @Override

-        public void add(ITupleReference tuple) throws HyracksDataException {

-            try {

-                accessor.insert(tuple);

-            } catch (IndexException e) {

-                throw new HyracksDataException(e);

-            }

-        }

-

-        @Override

-        public void end() throws HyracksDataException {

-            // do nothing

-        }

-

-    }

-

-    @Override

-    public long getMemoryAllocationSize() {

-        return 0;

-    }

+
+package edu.uci.ics.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractTreeIndex implements ITreeIndex {
+
+    protected final static int rootPage = 1;
+
+    protected final IBufferCache bufferCache;
+    protected final IFileMapProvider fileMapProvider;
+    protected final IFreePageManager freePageManager;
+
+    protected final ITreeIndexFrameFactory interiorFrameFactory;
+    protected final ITreeIndexFrameFactory leafFrameFactory;
+
+    protected final IBinaryComparatorFactory[] cmpFactories;
+    protected final int fieldCount;
+
+    protected FileReference file;
+    protected int fileId = -1;
+
+    private boolean isActivated = false;
+
+    public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
+            FileReference file) {
+        this.bufferCache = bufferCache;
+        this.fileMapProvider = fileMapProvider;
+        this.freePageManager = freePageManager;
+        this.interiorFrameFactory = interiorFrameFactory;
+        this.leafFrameFactory = leafFrameFactory;
+        this.cmpFactories = cmpFactories;
+        this.fieldCount = fieldCount;
+        this.file = file;
+    }
+
+    public synchronized void create() throws HyracksDataException {
+        if (isActivated) {
+            throw new HyracksDataException("Failed to create the index since it is activated.");
+        }
+
+        boolean fileIsMapped = false;
+        synchronized (fileMapProvider) {
+            fileIsMapped = fileMapProvider.isMapped(file);
+            if (!fileIsMapped) {
+                bufferCache.createFile(file);
+            }
+            fileId = fileMapProvider.lookupFileId(file);
+            try {
+                // Also creates the file if it doesn't exist yet.
+                bufferCache.openFile(fileId);
+            } catch (HyracksDataException e) {
+                // Revert state of buffer cache since file failed to open.
+                if (!fileIsMapped) {
+                    bufferCache.deleteFile(fileId, false);
+                }
+                throw e;
+            }
+        }
+
+        freePageManager.open(fileId);
+        initEmptyTree();
+        freePageManager.close();
+        bufferCache.closeFile(fileId);
+    }
+
+    private void initEmptyTree() throws HyracksDataException {
+        ITreeIndexFrame frame = leafFrameFactory.createFrame();
+        ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+        freePageManager.init(metaFrame, rootPage);
+
+        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+        rootNode.acquireWriteLatch();
+        try {
+            frame.setPage(rootNode);
+            frame.initBuffer((byte) 0);
+        } finally {
+            rootNode.releaseWriteLatch();
+            bufferCache.unpin(rootNode);
+        }
+    }
+
+    public synchronized void activate() throws HyracksDataException {
+        if (isActivated) {
+            throw new HyracksDataException("Failed to activate the index since it is already activated.");
+        }
+
+        boolean fileIsMapped = false;
+        synchronized (fileMapProvider) {
+            fileIsMapped = fileMapProvider.isMapped(file);
+            if (!fileIsMapped) {
+                bufferCache.createFile(file);
+            }
+            fileId = fileMapProvider.lookupFileId(file);
+            try {
+                // Also creates the file if it doesn't exist yet.
+                bufferCache.openFile(fileId);
+            } catch (HyracksDataException e) {
+                // Revert state of buffer cache since file failed to open.
+                if (!fileIsMapped) {
+                    bufferCache.deleteFile(fileId, false);
+                }
+                throw e;
+            }
+        }
+        freePageManager.open(fileId);
+
+        // TODO: Should probably have some way to check that the tree is physically consistent
+        // or that the file we just opened actually is a tree
+
+        isActivated = true;
+    }
+
+    public synchronized void deactivate() throws HyracksDataException {
+        if (!isActivated) {
+            throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
+        }
+
+        bufferCache.closeFile(fileId);
+        freePageManager.close();
+
+        isActivated = false;
+    }
+
+    public synchronized void destroy() throws HyracksDataException {
+        if (isActivated) {
+            throw new HyracksDataException("Failed to destroy the index since it is activated.");
+        }
+
+        file.delete();
+        if (fileId == -1) {
+            return;
+        }
+
+        bufferCache.deleteFile(fileId, false);
+        fileId = -1;
+    }
+
+    public synchronized void clear() throws HyracksDataException {
+        if (!isActivated) {
+            throw new HyracksDataException("Failed to clear the index since it is not activated.");
+        }
+        initEmptyTree();
+    }
+
+    public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
+        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+        rootNode.acquireReadLatch();
+        try {
+            frame.setPage(rootNode);
+            if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {
+                return true;
+            } else {
+                return false;
+            }
+        } finally {
+            rootNode.releaseReadLatch();
+            bufferCache.unpin(rootNode);
+        }
+    }
+
+    public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
+        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+        rootNode.acquireReadLatch();
+        try {
+            frame.setPage(rootNode);
+            return frame.getLevel();
+        } finally {
+            rootNode.releaseReadLatch();
+            bufferCache.unpin(rootNode);
+        }
+    }
+
+    public int getFileId() {
+        return fileId;
+    }
+
+    public FileReference getFileReference() {
+        return file;
+    }
+
+    public IBufferCache getBufferCache() {
+        return bufferCache;
+    }
+
+    public ITreeIndexFrameFactory getInteriorFrameFactory() {
+        return interiorFrameFactory;
+    }
+
+    public ITreeIndexFrameFactory getLeafFrameFactory() {
+        return leafFrameFactory;
+    }
+
+    public IBinaryComparatorFactory[] getComparatorFactories() {
+        return cmpFactories;
+    }
+
+    public IFreePageManager getFreePageManager() {
+        return freePageManager;
+    }
+
+    public int getRootPageId() {
+        return rootPage;
+    }
+
+    public int getFieldCount() {
+        return fieldCount;
+    }
+
+    public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+        protected final MultiComparator cmp;
+        protected final int slotSize;
+        protected final int leafMaxBytes;
+        protected final int interiorMaxBytes;
+        protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();
+        protected final ITreeIndexMetaDataFrame metaFrame;
+        protected final ITreeIndexTupleWriter tupleWriter;
+        protected ITreeIndexFrame leafFrame;
+        protected ITreeIndexFrame interiorFrame;
+        private boolean releasedLatches;
+
+        public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
+            leafFrame = leafFrameFactory.createFrame();
+            interiorFrame = interiorFrameFactory.createFrame();
+            metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+
+            if (!isEmptyTree(leafFrame)) {
+                throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
+            }
+
+            this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
+
+            leafFrame.setMultiComparator(cmp);
+            interiorFrame.setMultiComparator(cmp);
+
+            tupleWriter = leafFrame.getTupleWriter();
+
+            NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
+            leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
+            leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);
+            leafFrontier.page.acquireWriteLatch();
+
+            interiorFrame.setPage(leafFrontier.page);
+            interiorFrame.initBuffer((byte) 0);
+            interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);
+
+            leafFrame.setPage(leafFrontier.page);
+            leafFrame.initBuffer((byte) 0);
+            leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);
+            slotSize = leafFrame.getSlotSize();
+
+            nodeFrontiers.add(leafFrontier);
+        }
+
+        public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
+
+        protected void handleException() throws HyracksDataException {
+            // Unlatch and unpin pages.
+            for (NodeFrontier nodeFrontier : nodeFrontiers) {
+                nodeFrontier.page.releaseWriteLatch();
+                bufferCache.unpin(nodeFrontier.page);
+            }
+            releasedLatches = true;
+        }
+
+        @Override
+        public void end() throws HyracksDataException {
+            // copy the root generated from the bulk-load to *the* root page location
+            ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+            newRoot.acquireWriteLatch();
+            NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);
+            try {
+                System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,
+                        lastNodeFrontier.page.getBuffer().capacity());
+            } finally {
+                newRoot.releaseWriteLatch();
+                bufferCache.unpin(newRoot);
+
+                // register old root as a free page
+                freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);
+
+                if (!releasedLatches) {
+                    for (int i = 0; i < nodeFrontiers.size(); i++) {
+                        nodeFrontiers.get(i).page.releaseWriteLatch();
+                        bufferCache.unpin(nodeFrontiers.get(i).page);
+                    }
+                }
+            }
+        }
+
+        protected void addLevel() throws HyracksDataException {
+            NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
+            frontier.pageId = freePageManager.getFreePage(metaFrame);
+            frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
+            frontier.page.acquireWriteLatch();
+            frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+            interiorFrame.setPage(frontier.page);
+            interiorFrame.initBuffer((byte) nodeFrontiers.size());
+            nodeFrontiers.add(frontier);
+        }
+    }
+
+    public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
+        ITreeIndexAccessor accessor;
+
+        public TreeIndexInsertBulkLoader() throws HyracksDataException {
+            accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            try {
+                accessor.insert(tuple);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void end() throws HyracksDataException {
+            // do nothing
+        }
+
+    }
+
+    @Override
+    public long getMemoryAllocationSize() {
+        return 0;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 3557d73..59af99e 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
@@ -46,6 +45,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -279,7 +279,7 @@
                 memCursor.next();
                 LSMBTreeTupleReference lsmbtreeTuple = (LSMBTreeTupleReference) memCursor.getTuple();
                 if (!lsmbtreeTuple.isAntimatter()) {
-                    throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+                    throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
                 } else {
                     memCursor.close();
                     ctx.memBTreeAccessor.upsertIfConditionElseInsert(tuple, AntimatterAwareTupleAcceptor.INSTANCE);
@@ -299,7 +299,7 @@
         search(ctx, searchCursor, predicate);
         try {
             if (searchCursor.hasNext()) {
-                throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+                throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
             }
         } finally {
             searchCursor.close();
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 3eae5a7..244dfa0 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -43,6 +42,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -305,7 +305,7 @@
                 ctx.keysOnlyTuple.reset(tuple);
                 try {
                     ctx.deletedKeysBTreeAccessor.insert(ctx.keysOnlyTuple);
-                } catch (BTreeDuplicateKeyException e) {
+                } catch (TreeIndexDuplicateKeyException e) {
                     // Key has already been deleted.
                 }
                 break;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 222f4de..c70c2d5 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -19,9 +19,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -33,6 +31,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
@@ -117,7 +117,7 @@
             ITupleReference insertTuple = ctx.tupleIter.getTuple();
             try {
                 btreeAccessor.insert(insertTuple);
-            } catch (BTreeDuplicateKeyException e) {
+            } catch (TreeIndexDuplicateKeyException e) {
                 // This exception may be caused by duplicate tokens in the same insert "document".
                 // We ignore such duplicate tokens in all inverted-index implementations, hence
                 // we can safely ignore this exception.
@@ -134,7 +134,7 @@
             ITupleReference deleteTuple = ctx.tupleIter.getTuple();
             try {
                 btreeAccessor.delete(deleteTuple);
-            } catch (BTreeNonExistentKeyException e) {
+            } catch (TreeIndexNonExistentKeyException e) {
                 // Ignore this exception, since a document may have duplicate tokens.
             }
         }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index e3e690b..285010c 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -23,8 +23,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -36,6 +34,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -306,7 +306,7 @@
             if (foundTupleInMemoryBTree) {
                 try {
                     ctx.memBTreeAccessor.delete(tuple);
-                } catch (BTreeNonExistentKeyException e) {
+                } catch (TreeIndexNonExistentKeyException e) {
                     // Tuple has been deleted in the meantime. Do nothing.
                     // This normally shouldn't happen if we are dealing with
                     // good citizens since LSMRTree is used as a secondary
@@ -320,7 +320,7 @@
         } else {
             try {
                 ctx.memBTreeAccessor.insert(tuple);
-            } catch (BTreeDuplicateKeyException e) {
+            } catch (TreeIndexDuplicateKeyException e) {
                 // Do nothing, because one delete tuple is enough to indicate
                 // that all the corresponding insert tuples are deleted
             }
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index eafeff2..0a70185 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
@@ -140,9 +141,9 @@
 
     /**
      * This test the btree page split. Originally this test didn't pass since
-     * the btree was spliting by cardinality and not size. Thus, we might end
-     * up with a situation where there is not enough space to insert the new
-     * tuple after the split which will throw an error and the split won't be
+     * the btree was spliting by cardinality and not size. Thus, we might end up
+     * with a situation where there is not enough space to insert the new tuple
+     * after the split which will throw an error and the split won't be
      * propagated to upper level; thus, the tree is corrupted. Now, it split
      * page by size. The correct behavior on abnormally large keys/values.
      */
@@ -716,6 +717,12 @@
                     }
                     // Success.
                     break;
+                } catch (TreeIndexDuplicateKeyException e2) {
+                    if (j != i) {
+                        fail("Unexpected exception: " + e2.getMessage());
+                    }
+                    // Success.
+                    break;
                 }
             }
 
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
index 5b7b050..9de217b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
@@ -35,7 +35,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
 import edu.uci.ics.hyracks.storage.am.common.CheckTuple;
@@ -44,6 +43,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 @SuppressWarnings("rawtypes")
@@ -205,7 +205,7 @@
                 // Set expected values. Do this only after insertion succeeds
                 // because we ignore duplicate keys.
                 ctx.insertCheckTuple(createStringCheckTuple(fieldValues, ctx.getKeyFieldCount()), ctx.getCheckTuples());
-            } catch (BTreeDuplicateKeyException e) {
+            } catch (TreeIndexDuplicateKeyException e) {
                 // Ignore duplicate key insertions.
             }
         }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
index 262e21c..22d3e6a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -31,6 +29,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 public class BTreeTestWorker extends AbstractIndexTestWorker {
@@ -60,7 +60,7 @@
             case INSERT:
                 try {
                     accessor.insert(tuple);
-                } catch (BTreeDuplicateKeyException e) {
+                } catch (TreeIndexDuplicateKeyException e) {
                     // Ignore duplicate keys, since we get random tuples.
                 }
                 break;
@@ -74,7 +74,7 @@
                 deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
                 try {
                     accessor.delete(deleteTuple);
-                } catch (BTreeNonExistentKeyException e) {
+                } catch (TreeIndexNonExistentKeyException e) {
                     // Ignore non-existant keys, since we get random tuples.
                 }
                 break;
@@ -82,7 +82,7 @@
             case UPDATE:
                 try {
                     accessor.update(tuple);
-                } catch (BTreeNonExistentKeyException e) {
+                } catch (TreeIndexNonExistentKeyException e) {
                     // Ignore non-existant keys, since we get random tuples.
                 } catch (BTreeNotUpdateableException e) {
                     // Ignore not updateable exception due to numKeys == numFields.
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 8a37b4e..4a96131 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.AbstractIndexTestWorker;
@@ -30,6 +28,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree.LSMBTreeAccessor;
@@ -60,7 +60,7 @@
             case INSERT:
                 try {
                     accessor.insert(tuple);
-                } catch (BTreeDuplicateKeyException e) {
+                } catch (TreeIndexDuplicateKeyException e) {
                     // Ignore duplicate keys, since we get random tuples.
                 }
                 break;
@@ -74,7 +74,7 @@
                 deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
                 try {
                     accessor.delete(deleteTuple);
-                } catch (BTreeNonExistentKeyException e) {
+                } catch (TreeIndexNonExistentKeyException e) {
                     // Ignore non-existant keys, since we get random tuples.
                 }
                 break;
@@ -82,7 +82,7 @@
             case UPDATE:
                 try {
                     accessor.update(tuple);
-                } catch (BTreeNonExistentKeyException e) {
+                } catch (TreeIndexNonExistentKeyException e) {
                     // Ignore non-existant keys, since we get random tuples.
                 } catch (BTreeNotUpdateableException e) {
                     // Ignore not updateable exception due to numKeys == numFields.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index c52130d..26cb8d0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -35,7 +36,6 @@
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.JobStateUtils;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
  * User applications should all inherit {@link Vertex}, and implement their own
@@ -270,7 +270,7 @@
             delegate.setVertex(this);
         }
         destEdgeList.clear();
-        long edgeMapSize = SerDeUtils.readVLong(in);
+        long edgeMapSize = WritableUtils.readVLong(in);
         for (long i = 0; i < edgeMapSize; ++i) {
             Edge<I, E> edge = allocateEdge();
             edge.setConf(getContext().getConfiguration());
@@ -278,7 +278,7 @@
             addEdge(edge);
         }
         msgList.clear();
-        long msgListSize = SerDeUtils.readVLong(in);
+        long msgListSize = WritableUtils.readVLong(in);
         for (long i = 0; i < msgListSize; ++i) {
             M msg = allocateMessage();
             msg.readFields(in);
@@ -297,11 +297,11 @@
         if (vertexValue != null) {
             vertexValue.write(out);
         }
-        SerDeUtils.writeVLong(out, destEdgeList.size());
+        WritableUtils.writeVLong(out, destEdgeList.size());
         for (Edge<I, E> edge : destEdgeList) {
             edge.write(out);
         }
-        SerDeUtils.writeVLong(out, msgList.size());
+        WritableUtils.writeVLong(out, msgList.size());
         for (M msg : msgList) {
             msg.write(out);
         }
@@ -618,7 +618,6 @@
      * Terminate the current partition where the current vertex stays in.
      * This will immediately take effect and the upcoming vertice in the
      * same partition cannot be processed.
-     * 
      */
     protected final void terminatePartition() {
         voteToHalt();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
index 25b07ff..a4336a3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -40,47 +40,4 @@
         object.readFields(input);
     }
 
-    public static long readVLong(DataInput in) throws IOException {
-        int vLen = 0;
-        long value = 0L;
-        while (true) {
-            byte b = (byte) in.readByte();
-            ++vLen;
-            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
-            if ((b & 0x80) == 0) {
-                break;
-            }
-        }
-        return value;
-    }
-
-    public static void writeVLong(DataOutput out, long value) throws IOException {
-        long data = value;
-        do {
-            byte b = (byte) (data & 0x7f);
-            data >>= 7;
-            if (data != 0) {
-                b |= 0x80;
-            }
-            out.write(b);
-        } while (data != 0);
-    }
-
-    public static long readVLong(byte[] data, int start, int length) {
-        int vLen = 0;
-        long value = 0L;
-        while (true) {
-            byte b = (byte) data[start];
-            ++vLen;
-            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
-            if ((b & 0x80) == 0) {
-                break;
-            }
-            ++start;
-        }
-        if (vLen != length)
-            throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
-        return value;
-    }
-
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 2d4064b..5325397 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -204,6 +204,7 @@
 
     private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
+        job.setMaxReattempts(0);
         JobId jobId = hcc.startJob(deploymentId, job,
                 profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 478ac07..42359eb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -241,7 +241,7 @@
         typeTraits[1] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false,
+                sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
                 getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 49309ec..db18e53 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.TreeMap;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -197,7 +198,8 @@
     public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
         try {
             IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
+            ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
             NCs = new String[ncNameToNcInfos.size()];
             ipToNcMapping = new HashMap<String, List<String>>();
             int i = 0;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 150bd8b..6386bdb 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -17,19 +17,15 @@
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public class TupleDeserializer {
-    private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
-
+    private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation";
     private Object[] record;
     private RecordDescriptor recordDescriptor;
     private ResetableByteArrayInputStream bbis;
@@ -43,132 +39,116 @@
     }
 
     public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
-        for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
-            byte[] data = tupleRef.getFieldData(i);
-            int offset = tupleRef.getFieldStart(i);
-            bbis.setByteArray(data, offset);
+        try {
+            for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
+                byte[] data = tupleRef.getFieldData(i);
+                int offset = tupleRef.getFieldStart(i);
+                int len = tupleRef.getFieldLength(i);
+                bbis.setByteArray(data, offset);
 
-            Object instance = recordDescriptor.getFields()[i].deserialize(di);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(i + " " + instance);
-            }
-            record[i] = instance;
-            if (FrameConstants.DEBUG_FRAME_IO) {
-                try {
-                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
-                        throw new HyracksDataException("Field magic mismatch");
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
+                int availableBefore = bbis.available();
+                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                int availableAfter = bbis.available();
+                if (availableBefore - availableAfter > len) {
+                    throw new IllegalStateException(ERROR_MSG);
                 }
+
+                record[i] = instance;
             }
+            return record;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
         }
-        return record;
     }
 
     public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
             throws HyracksDataException {
-        byte[] data = left.getBuffer().array();
-        int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
-        int leftFieldCount = left.getFieldCount();
-        int fStart = tStart;
-        for (int i = 0; i < leftFieldCount; ++i) {
-            /**
-             * reset the input
-             */
-            fStart = tStart + left.getFieldStartOffset(tIndex, i);
-            bbis.setByteArray(data, fStart);
+        try {
+            byte[] data = left.getBuffer().array();
+            int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+            int leftFieldCount = left.getFieldCount();
+            int fStart = tStart;
+            for (int i = 0; i < leftFieldCount; ++i) {
+                /**
+                 * reset the input
+                 */
+                fStart = tStart + left.getFieldStartOffset(tIndex, i);
+                int fieldLength = left.getFieldLength(tIndex, i);
+                bbis.setByteArray(data, fStart);
 
-            /**
-             * do deserialization
-             */
-            Object instance = recordDescriptor.getFields()[i].deserialize(di);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(i + " " + instance);
-            }
-            record[i] = instance;
-            if (FrameConstants.DEBUG_FRAME_IO) {
-                try {
-                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
-                        throw new HyracksDataException("Field magic mismatch");
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        for (int i = leftFieldCount; i < record.length; ++i) {
-            byte[] rightData = right.getFieldData(i - leftFieldCount);
-            int rightOffset = right.getFieldStart(i - leftFieldCount);
-            bbis.setByteArray(rightData, rightOffset);
+                /**
+                 * do deserialization
+                 */
+                int availableBefore = bbis.available();
+                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                int availableAfter = bbis.available();
+                if (availableBefore - availableAfter > fieldLength) {
+                    throw new IllegalStateException(ERROR_MSG);
 
-            Object instance = recordDescriptor.getFields()[i].deserialize(di);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(i + " " + instance);
-            }
-            record[i] = instance;
-            if (FrameConstants.DEBUG_FRAME_IO) {
-                try {
-                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
-                        throw new HyracksDataException("Field magic mismatch");
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
                 }
+                record[i] = instance;
             }
+            for (int i = leftFieldCount; i < record.length; ++i) {
+                byte[] rightData = right.getFieldData(i - leftFieldCount);
+                int rightOffset = right.getFieldStart(i - leftFieldCount);
+                int len = right.getFieldLength(i - leftFieldCount);
+                bbis.setByteArray(rightData, rightOffset);
+
+                int availableBefore = bbis.available();
+                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                int availableAfter = bbis.available();
+                if (availableBefore - availableAfter > len) {
+                    throw new IllegalStateException(ERROR_MSG);
+                }
+                record[i] = instance;
+            }
+            return record;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
         }
-        return record;
     }
 
     public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
-        byte[] data = tb.getByteArray();
-        int[] offset = tb.getFieldEndOffsets();
-        int start = 0;
-        for (int i = 0; i < offset.length; ++i) {
-            /**
-             * reset the input
-             */
-            bbis.setByteArray(data, start);
-            start = offset[i];
+        try {
+            byte[] data = tb.getByteArray();
+            int[] offset = tb.getFieldEndOffsets();
+            int start = 0;
+            for (int i = 0; i < offset.length; ++i) {
+                /**
+                 * reset the input
+                 */
+                bbis.setByteArray(data, start);
+                start = offset[i];
+                int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
 
-            /**
-             * do deserialization
-             */
-            Object instance = recordDescriptor.getFields()[i].deserialize(di);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(i + " " + instance);
-            }
-            record[i] = instance;
-            if (FrameConstants.DEBUG_FRAME_IO) {
-                try {
-                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
-                        throw new HyracksDataException("Field magic mismatch");
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
+                /**
+                 * do deserialization
+                 */
+                int availableBefore = bbis.available();
+                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                int availableAfter = bbis.available();
+                if (availableBefore - availableAfter > fieldLength) {
+                    throw new IllegalStateException(ERROR_MSG);
                 }
+                record[i] = instance;
             }
-        }
-        for (int i = offset.length; i < record.length; ++i) {
-            byte[] rightData = right.getFieldData(i - offset.length);
-            int rightOffset = right.getFieldStart(i - offset.length);
-            bbis.setByteArray(rightData, rightOffset);
+            for (int i = offset.length; i < record.length; ++i) {
+                byte[] rightData = right.getFieldData(i - offset.length);
+                int rightOffset = right.getFieldStart(i - offset.length);
+                bbis.setByteArray(rightData, rightOffset);
+                int fieldLength = right.getFieldLength(i - offset.length);
 
-            Object instance = recordDescriptor.getFields()[i].deserialize(di);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(i + " " + instance);
-            }
-            record[i] = instance;
-            if (FrameConstants.DEBUG_FRAME_IO) {
-                try {
-                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
-                        throw new HyracksDataException("Field magic mismatch");
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
+                int availableBefore = bbis.available();
+                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                int availableAfter = bbis.available();
+                if (availableBefore - availableAfter > fieldLength) {
+                    throw new IllegalStateException(ERROR_MSG);
                 }
+                record[i] = instance;
             }
+            return record;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
         }
-        return record;
     }
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index ea1e02e..4421695 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 
 /**
  * The buffer to hold updates.
@@ -96,7 +97,12 @@
             fta.reset(buffer);
             for (int j = 0; j < fta.getTupleCount(); j++) {
                 tuple.reset(fta, j);
-                bta.update(tuple);
+                try {
+                    bta.update(tuple);
+                } catch (TreeIndexNonExistentKeyException e) {
+                    // ignore non-existent key exception
+                    bta.insert(tuple);
+                }
             }
         }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 7d824ea..90065c2 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,8 +14,13 @@
  */
 package edu.uci.ics.pregelix.example.data;
 
+import java.io.DataInput;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.WritableUtils;
+
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
 
 /**
  * @author yingyib
@@ -26,34 +31,42 @@
     private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
     private static final int NEGATIVE_LONG_MASK = (0 << 30);
 
+    private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+    private DataInput dis = new DataInputStream(bis);
+
     @Override
     public int getNormalizedKey(byte[] bytes, int start, int length) {
-        long value = SerDeUtils.readVLong(bytes, start, length);
-        int highValue = (int) (value >> 32);
-        if (highValue > 0) {
-            /**
-             * larger than Integer.MAX
-             */
-            int highNmk = getKey(highValue);
-            highNmk >>= 2;
-            highNmk |= POSTIVE_LONG_MASK;
-            return highNmk;
-        } else if (highValue == 0) {
-            /**
-             * smaller than Integer.MAX but >=0
-             */
-            int lowNmk = (int) value;
-            lowNmk >>= 2;
-            lowNmk |= NON_NEGATIVE_INT_MASK;
-            return lowNmk;
-        } else {
-            /**
-             * less than 0; TODO: have not optimized for that
-             */
-            int highNmk = getKey(highValue);
-            highNmk >>= 2;
-            highNmk |= NEGATIVE_LONG_MASK;
-            return highNmk;
+        try {
+            bis.setByteArray(bytes, start);
+            long value = WritableUtils.readVLong(dis);
+            int highValue = (int) (value >> 32);
+            if (highValue > 0) {
+                /**
+                 * larger than Integer.MAX
+                 */
+                int highNmk = getKey(highValue);
+                highNmk >>= 2;
+                highNmk |= POSTIVE_LONG_MASK;
+                return highNmk;
+            } else if (highValue == 0) {
+                /**
+                 * smaller than Integer.MAX but >=0
+                 */
+                int lowNmk = (int) value;
+                lowNmk >>= 2;
+                lowNmk |= NON_NEGATIVE_INT_MASK;
+                return lowNmk;
+            } else {
+                /**
+                 * less than 0; TODO: have not optimized for that
+                 */
+                int highNmk = getKey(highValue);
+                highNmk >>= 2;
+                highNmk |= NEGATIVE_LONG_MASK;
+                return highNmk;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
         }
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ec1109f..1c5f629 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,14 +16,14 @@
 package edu.uci.ics.pregelix.example.io;
 
 import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.DataInputStream;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
 
 import edu.uci.ics.pregelix.api.io.WritableSizable;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
 
 /**
  * A WritableComparable for longs in a variable-length format. Such values take
@@ -32,17 +32,7 @@
  * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
  */
 @SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable, WritableSizable {
-    private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
-    private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
-    private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
-    private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
-    private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
-    private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
-    private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
-    private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
-
-    private long value;
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
 
     public VLongWritable() {
     }
@@ -52,78 +42,46 @@
     }
 
     public int sizeInBytes() {
-        if (value >= 0 && value <= ONE_BYTE_MAX) {
+        long i = get();
+        if (i >= -112 && i <= 127) {
             return 1;
-        } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
-            return 2;
-        } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
-            return 3;
-        } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
-            return 4;
-        } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
-            return 5;
-        } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
-            return 6;
-        } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
-            return 7;
-        } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
-            return 8;
-        } else {
-            return 9;
         }
-    }
 
-    /** Set the value of this LongWritable. */
-    public void set(long value) {
-        this.value = value;
-    }
+        int len = -112;
+        if (i < 0) {
+            i ^= -1L; // take one's complement'
+            len = -120;
+        }
 
-    /** Return the value of this LongWritable. */
-    public long get() {
-        return value;
-    }
+        long tmp = i;
+        while (tmp != 0) {
+            tmp = tmp >> 8;
+            len--;
+        }
 
-    public void readFields(DataInput in) throws IOException {
-        value = SerDeUtils.readVLong(in);
-    }
-
-    public void write(DataOutput out) throws IOException {
-        SerDeUtils.writeVLong(out, value);
-    }
-
-    /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
-    public boolean equals(Object o) {
-        if (!(o instanceof VLongWritable))
-            return false;
-        VLongWritable other = (VLongWritable) o;
-        return this.value == other.value;
-    }
-
-    public int hashCode() {
-        return (int) value;
-    }
-
-    /** Compares two VLongWritables. */
-    public int compareTo(Object o) {
-        long thisValue = this.value;
-        long thatValue = ((VLongWritable) o).value;
-        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
-    }
-
-    public String toString() {
-        return Long.toString(value);
+        len = (len < -120) ? -(len + 120) : -(len + 112);
+        return len + 1;
     }
 
     /** A Comparator optimized for LongWritable. */
     public static class Comparator extends WritableComparator {
+        private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+        private DataInput dis = new DataInputStream(bis);
+
         public Comparator() {
             super(VLongWritable.class);
         }
 
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            long thisValue = SerDeUtils.readVLong(b1, s1, l1);
-            long thatValue = SerDeUtils.readVLong(b2, s2, l2);
-            return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+            try {
+                bis.setByteArray(b1, s1);
+                long thisValue = WritableUtils.readVLong(dis);
+                bis.setByteArray(b2, s2);
+                long thatValue = WritableUtils.readVLong(dis);
+                return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
         }
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 196b114..638011b 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -41,10 +41,13 @@
     public void testVLong() throws Exception {
         Random rand = new Random(System.currentTimeMillis());
         MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+        msgList.add(new VLongWritable(Long.MAX_VALUE));
+        msgList.add(new VLongWritable(Long.MIN_VALUE));
+        msgList.add(new VLongWritable(-1));
         for (int i = 0; i < 1000000; i++) {
             msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
         }
-        verifySizeEstimation(msgList);
+        verifyExactSizeEstimation(msgList);
     }
 
     @Test
@@ -96,7 +99,7 @@
         }
         verifySizeEstimation(msgList);
     }
-    
+
     @Test
     public void testNull() throws Exception {
         MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
@@ -105,7 +108,7 @@
         }
         verifySizeEstimation(msgList);
     }
-    
+
     @Test
     public void testVInt() throws Exception {
         Random rand = new Random(System.currentTimeMillis());
@@ -115,7 +118,7 @@
         }
         verifySizeEstimation(msgList);
     }
-    
+
     @Test
     public void testInt() throws Exception {
         Random rand = new Random(System.currentTimeMillis());
@@ -148,4 +151,26 @@
         }
     }
 
+    private void verifyExactSizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput dos = new DataOutputStream(bos);
+        int accumulatedSize = 5;
+        for (int i = 0; i < msgList.size(); i++) {
+            bos.reset();
+            WritableSizable value = msgList.get(i);
+            value.write(dos);
+            if (value.sizeInBytes() != bos.size()) {
+                throw new Exception(value + " estimated size (" + value.sizeInBytes()
+                        + ") is smaller than the actual size" + bos.size());
+            }
+            accumulatedSize += value.sizeInBytes();
+        }
+        bos.reset();
+        msgList.write(dos);
+        if (accumulatedSize < bos.size()) {
+            throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+                    + bos.size());
+        }
+    }
+
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index d46457c..ab564fa 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -205,7 +205,11 @@
                  */
                 if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
                     terminate = false;
-                aggregator.step(vertex);
+
+                if (msgContentList.segmentEnd()) {
+                    /** the if condition makes sure aggregate only calls once per-vertex */
+                    aggregator.step(vertex);
+                }
             }
 
             @Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3d52a45..acd766e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -180,7 +180,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
-                return size;
+                return size * 2;
             }
 
             private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
@@ -224,4 +224,4 @@
 
         };
     }
-}
\ No newline at end of file
+}