Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 76f411b..f71ee1d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -17,7 +17,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -47,10 +46,9 @@
private int dataFrameCount;
private int[] tPointers;
+ private int[] tPointersTemp;
private int tupleCount;
- private Random rand = new Random();
-
public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
@@ -119,8 +117,8 @@
}
}
if (tupleCount > 0) {
- shuffle(tPointers, 4, tupleCount);
- sort(tPointers, 0, tupleCount);
+ tPointersTemp = new int[tPointers.length];
+ sort(0, tupleCount);
}
}
@@ -146,75 +144,73 @@
}
}
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- int mi = tPointers[m * 4];
- int mj = tPointers[m * 4 + 1];
- int mv = tPointers[m * 4 + 3];
-
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int cmp = compare(tPointers, b, mi, mj, mv);
- if (cmp > 0) {
- break;
+ private void sort(int offset, int length) {
+ int step = 1;
+ int len = length;
+ int end = offset + len;
+ /** bottom-up merge */
+ while (step < len) {
+ /** merge */
+ for (int i = offset; i < end; i += 2 * step) {
+ int next = i + step;
+ if (next < end) {
+ merge(i, next, step, Math.min(step, end - next));
+ } else {
+ System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
}
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
}
- while (c >= b) {
- int cmp = compare(tPointers, c, mi, mj, mv);
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
+ /** prepare next phase merge */
+ step *= 2;
+ int[] tmp = tPointersTemp;
+ tPointersTemp = tPointers;
+ tPointers = tmp;
+ }
+ }
+
+ /** Merge two subarrays into one*/
+ private void merge(int start1, int start2, int len1, int len2) {
+ int targetPos = start1;
+ int pos1 = start1;
+ int pos2 = start2;
+ int end1 = start1 + len1 - 1;
+ int end2 = start2 + len2 - 1;
+ while (pos1 <= end1 && pos2 <= end2) {
+ int cmp = compare(pos1, pos2);
+ if (cmp <= 0) {
+ copy(pos1, targetPos);
+ pos1++;
+ } else {
+ copy(pos2, targetPos);
+ pos2++;
}
- if (b > c)
- break;
- swap(tPointers, b++, c--);
+ targetPos++;
}
-
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
-
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
+ if (pos1 <= end1) {
+ int rest = end1 - pos1 + 1;
+ System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
}
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
+ if (pos2 <= end2) {
+ int rest = end2 - pos2 + 1;
+ System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
}
}
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 4; ++i) {
- int t = x[a * 4 + i];
- x[a * 4 + i] = x[b * 4 + i];
- x[b * 4 + i] = t;
- }
+ private void copy(int src, int dest) {
+ tPointersTemp[dest * 4] = tPointers[src * 4];
+ tPointersTemp[dest * 4 + 1] = tPointers[src * 4 + 1];
+ tPointersTemp[dest * 4 + 2] = tPointers[src * 4 + 2];
+ tPointersTemp[dest * 4 + 3] = tPointers[src * 4 + 3];
}
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
-
- private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+ private int compare(int tp1, int tp2) {
int i1 = tPointers[tp1 * 4];
int j1 = tPointers[tp1 * 4 + 1];
int v1 = tPointers[tp1 * 4 + 3];
+
+ int tp2i = tPointers[tp2 * 4];
+ int tp2j = tPointers[tp2 * 4 + 1];
+ int tp2v = tPointers[tp2 * 4 + 3];
+
if (v1 != tp2v) {
return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
}
@@ -244,18 +240,6 @@
return 0;
}
- private void shuffle(int[] tPointers, int interval, int tupleCount) {
- for (int i = tupleCount; i > 1; i--) {
- int next = rand.nextInt(i) * interval;
- int target = (i - 1) * interval;
- for (int j = 0; j < interval; j++) {
- int drawn = tPointers[next + j];
- tPointers[next + j] = tPointers[target + j];
- tPointers[target + j] = drawn;
- }
- }
- }
-
public void close() {
this.buffers.clear();
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
deleted file mode 100644
index 7237046..0000000
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeNonExistentKeyException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
-
-public class BTreeNonExistentKeyException extends BTreeException {
-
- private static final long serialVersionUID = 1L;
-
- public BTreeNonExistentKeyException(Exception e) {
- super(e);
- }
-
- public BTreeNonExistentKeyException(String message) {
- super(message);
- }
-}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index b503c8b..93cde3d 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -25,8 +25,6 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IPrefixSlotManager;
import edu.uci.ics.hyracks.storage.am.btree.compressors.FieldPrefixCompressor;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixPrefixTupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixSlotManager;
@@ -37,6 +35,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -370,7 +370,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return slot;
}
@@ -382,7 +382,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return slot;
}
@@ -411,7 +411,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
}
return slot;
}
@@ -423,7 +423,7 @@
int tupleIndex = slotManager.decodeSecondSlotField(slot);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
return slot;
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 1974989..187cd52 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -20,14 +20,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
@@ -72,7 +72,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is an exact match.
if (tupleIndex == slotManager.getErrorIndicator()) {
- throw new BTreeDuplicateKeyException("Trying to insert duplicate key into leaf node.");
+ throw new TreeIndexDuplicateKeyException("Trying to insert duplicate key into leaf node.");
}
return tupleIndex;
}
@@ -83,7 +83,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to update a tuple with a nonexistent key in leaf node.");
}
return tupleIndex;
}
@@ -121,7 +121,7 @@
FindTupleNoExactMatchPolicy.HIGHER_KEY);
// Error indicator is set if there is no exact match.
if (tupleIndex == slotManager.getErrorIndicator() || tupleIndex == slotManager.getGreatestKeyIndicator()) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
return tupleIndex;
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 689843b..ff94040 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -33,7 +33,6 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.ITupleAcceptor;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
@@ -53,6 +52,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -562,7 +563,7 @@
// This means that there could be underflow, even an empty page that is
// pointed to by an interior node.
if (ctx.leafFrame.getTupleCount() == 0) {
- throw new BTreeNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
+ throw new TreeIndexNonExistentKeyException("Trying to delete a tuple with a nonexistent key in leaf node.");
}
int tupleIndex = ctx.leafFrame.findDeleteTupleIndex(tuple);
ITupleReference beforeTuple = ctx.leafFrame.getMatchingKeyTuple(tuple, tupleIndex);
@@ -1024,9 +1025,12 @@
protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws IndexException,
HyracksDataException {
// New tuple should be strictly greater than last tuple.
- if (cmp.compare(tuple, prevTuple) <= 0) {
- throw new UnsortedInputException(
- "Input stream given to BTree bulk load is not sorted or has duplicates.");
+ int cmpResult = cmp.compare(tuple, prevTuple);
+ if (cmpResult < 0) {
+ throw new UnsortedInputException("Input stream given to BTree bulk load is not sorted.");
+ }
+ if (cmpResult == 0) {
+ throw new TreeIndexDuplicateKeyException("Input stream given to BTree bulk load has duplicates.");
}
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4a9cfb4..092fada 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -30,6 +30,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -97,7 +99,11 @@
switch (op) {
case INSERT: {
- indexAccessor.insert(tuple);
+ try {
+ indexAccessor.insert(tuple);
+ } catch (TreeIndexDuplicateKeyException e) {
+ // ingnore that exception to allow inserting existing keys which becomes an NoOp
+ }
break;
}
case UPDATE: {
@@ -109,7 +115,11 @@
break;
}
case DELETE: {
- indexAccessor.delete(tuple);
+ try {
+ indexAccessor.delete(tuple);
+ } catch (TreeIndexNonExistentKeyException e) {
+ // ingnore that exception to allow deletions of non-existing keys
+ }
break;
}
default: {
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
similarity index 70%
rename from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
rename to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
index cde5022..1767504 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexDuplicateKeyException.java
@@ -13,16 +13,18 @@
* limitations under the License.
*/
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
-public class BTreeDuplicateKeyException extends BTreeException {
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+public class TreeIndexDuplicateKeyException extends TreeIndexException {
private static final long serialVersionUID = 1L;
- public BTreeDuplicateKeyException(Exception e) {
+ public TreeIndexDuplicateKeyException(Exception e) {
super(e);
}
- public BTreeDuplicateKeyException(String message) {
+ public TreeIndexDuplicateKeyException(String message) {
super(message);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
similarity index 61%
copy from hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
copy to hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
index cde5022..8b62063 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/exceptions/BTreeDuplicateKeyException.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/exceptions/TreeIndexNonExistentKeyException.java
@@ -13,16 +13,19 @@
* limitations under the License.
*/
-package edu.uci.ics.hyracks.storage.am.btree.exceptions;
+package edu.uci.ics.hyracks.storage.am.common.exceptions;
-public class BTreeDuplicateKeyException extends BTreeException {
- private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
- public BTreeDuplicateKeyException(Exception e) {
- super(e);
- }
+public class TreeIndexNonExistentKeyException extends TreeIndexException {
- public BTreeDuplicateKeyException(String message) {
- super(message);
- }
+ private static final long serialVersionUID = 1L;
+
+ public TreeIndexNonExistentKeyException(Exception e) {
+ super(e);
+ }
+
+ public TreeIndexNonExistentKeyException(String message) {
+ super(message);
+ }
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 19d40a0..a85a174 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -12,352 +12,356 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package edu.uci.ics.hyracks.storage.am.common.impls;
-
-import java.util.ArrayList;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
-import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public abstract class AbstractTreeIndex implements ITreeIndex {
-
- protected final static int rootPage = 1;
-
- protected final IBufferCache bufferCache;
- protected final IFileMapProvider fileMapProvider;
- protected final IFreePageManager freePageManager;
-
- protected final ITreeIndexFrameFactory interiorFrameFactory;
- protected final ITreeIndexFrameFactory leafFrameFactory;
-
- protected final IBinaryComparatorFactory[] cmpFactories;
- protected final int fieldCount;
-
- protected FileReference file;
- protected int fileId = -1;
-
- private boolean isActivated = false;
-
- public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
- IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
- ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
- FileReference file) {
- this.bufferCache = bufferCache;
- this.fileMapProvider = fileMapProvider;
- this.freePageManager = freePageManager;
- this.interiorFrameFactory = interiorFrameFactory;
- this.leafFrameFactory = leafFrameFactory;
- this.cmpFactories = cmpFactories;
- this.fieldCount = fieldCount;
- this.file = file;
- }
-
- public synchronized void create() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to create the index since it is activated.");
- }
-
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(file);
- if (!fileIsMapped) {
- bufferCache.createFile(file);
- }
- fileId = fileMapProvider.lookupFileId(file);
- try {
- // Also creates the file if it doesn't exist yet.
- bufferCache.openFile(fileId);
- } catch (HyracksDataException e) {
- // Revert state of buffer cache since file failed to open.
- if (!fileIsMapped) {
- bufferCache.deleteFile(fileId, false);
- }
- throw e;
- }
- }
-
- freePageManager.open(fileId);
- initEmptyTree();
- freePageManager.close();
- bufferCache.closeFile(fileId);
- }
-
- private void initEmptyTree() throws HyracksDataException {
- ITreeIndexFrame frame = leafFrameFactory.createFrame();
- ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
- freePageManager.init(metaFrame, rootPage);
-
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
- rootNode.acquireWriteLatch();
- try {
- frame.setPage(rootNode);
- frame.initBuffer((byte) 0);
- } finally {
- rootNode.releaseWriteLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public synchronized void activate() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to activate the index since it is already activated.");
- }
-
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(file);
- if (!fileIsMapped) {
- bufferCache.createFile(file);
- }
- fileId = fileMapProvider.lookupFileId(file);
- try {
- // Also creates the file if it doesn't exist yet.
- bufferCache.openFile(fileId);
- } catch (HyracksDataException e) {
- // Revert state of buffer cache since file failed to open.
- if (!fileIsMapped) {
- bufferCache.deleteFile(fileId, false);
- }
- throw e;
- }
- }
- freePageManager.open(fileId);
-
- // TODO: Should probably have some way to check that the tree is physically consistent
- // or that the file we just opened actually is a tree
-
- isActivated = true;
- }
-
- public synchronized void deactivate() throws HyracksDataException {
- if (!isActivated) {
- throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
- }
-
- bufferCache.closeFile(fileId);
- freePageManager.close();
-
- isActivated = false;
- }
-
- public synchronized void destroy() throws HyracksDataException {
- if (isActivated) {
- throw new HyracksDataException("Failed to destroy the index since it is activated.");
- }
-
- file.delete();
- if (fileId == -1) {
- return;
- }
-
- bufferCache.deleteFile(fileId, false);
- fileId = -1;
- }
-
- public synchronized void clear() throws HyracksDataException {
- if (!isActivated) {
- throw new HyracksDataException("Failed to clear the index since it is not activated.");
- }
- initEmptyTree();
- }
-
- public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
- rootNode.acquireReadLatch();
- try {
- frame.setPage(rootNode);
- if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {
- return true;
- } else {
- return false;
- }
- } finally {
- rootNode.releaseReadLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
- rootNode.acquireReadLatch();
- try {
- frame.setPage(rootNode);
- return frame.getLevel();
- } finally {
- rootNode.releaseReadLatch();
- bufferCache.unpin(rootNode);
- }
- }
-
- public int getFileId() {
- return fileId;
- }
-
- public FileReference getFileReference() {
- return file;
- }
-
- public IBufferCache getBufferCache() {
- return bufferCache;
- }
-
- public ITreeIndexFrameFactory getInteriorFrameFactory() {
- return interiorFrameFactory;
- }
-
- public ITreeIndexFrameFactory getLeafFrameFactory() {
- return leafFrameFactory;
- }
-
- public IBinaryComparatorFactory[] getComparatorFactories() {
- return cmpFactories;
- }
-
- public IFreePageManager getFreePageManager() {
- return freePageManager;
- }
-
- public int getRootPageId() {
- return rootPage;
- }
-
- public int getFieldCount() {
- return fieldCount;
- }
-
- public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
- protected final MultiComparator cmp;
- protected final int slotSize;
- protected final int leafMaxBytes;
- protected final int interiorMaxBytes;
- protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();
- protected final ITreeIndexMetaDataFrame metaFrame;
- protected final ITreeIndexTupleWriter tupleWriter;
- protected ITreeIndexFrame leafFrame;
- protected ITreeIndexFrame interiorFrame;
-
- public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
- leafFrame = leafFrameFactory.createFrame();
- interiorFrame = interiorFrameFactory.createFrame();
- metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
-
- if (!isEmptyTree(leafFrame)) {
- throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
- }
-
- this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
-
- leafFrame.setMultiComparator(cmp);
- interiorFrame.setMultiComparator(cmp);
-
- tupleWriter = leafFrame.getTupleWriter();
-
- NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
- leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
- leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);
- leafFrontier.page.acquireWriteLatch();
-
- interiorFrame.setPage(leafFrontier.page);
- interiorFrame.initBuffer((byte) 0);
- interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);
-
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);
- slotSize = leafFrame.getSlotSize();
-
- nodeFrontiers.add(leafFrontier);
- }
-
- public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
-
- protected void handleException() throws HyracksDataException {
- // Unlatch and unpin pages.
- for (NodeFrontier nodeFrontier : nodeFrontiers) {
- nodeFrontier.page.releaseWriteLatch();
- bufferCache.unpin(nodeFrontier.page);
- }
- }
-
- @Override
- public void end() throws HyracksDataException {
- // copy the root generated from the bulk-load to *the* root page location
- ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
- newRoot.acquireWriteLatch();
- NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);
- try {
- System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,
- lastNodeFrontier.page.getBuffer().capacity());
- } finally {
- newRoot.releaseWriteLatch();
- bufferCache.unpin(newRoot);
-
- // register old root as a free page
- freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);
-
- for (int i = 0; i < nodeFrontiers.size(); i++) {
- nodeFrontiers.get(i).page.releaseWriteLatch();
- bufferCache.unpin(nodeFrontiers.get(i).page);
- }
- }
- }
-
- protected void addLevel() throws HyracksDataException {
- NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
- frontier.pageId = freePageManager.getFreePage(metaFrame);
- frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
- frontier.page.acquireWriteLatch();
- frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) nodeFrontiers.size());
- nodeFrontiers.add(frontier);
- }
- }
-
- public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
- ITreeIndexAccessor accessor;
-
- public TreeIndexInsertBulkLoader() throws HyracksDataException {
- accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- accessor.insert(tuple);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void end() throws HyracksDataException {
- // do nothing
- }
-
- }
-
- @Override
- public long getMemoryAllocationSize() {
- return 0;
- }
+
+package edu.uci.ics.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractTreeIndex implements ITreeIndex {
+
+ protected final static int rootPage = 1;
+
+ protected final IBufferCache bufferCache;
+ protected final IFileMapProvider fileMapProvider;
+ protected final IFreePageManager freePageManager;
+
+ protected final ITreeIndexFrameFactory interiorFrameFactory;
+ protected final ITreeIndexFrameFactory leafFrameFactory;
+
+ protected final IBinaryComparatorFactory[] cmpFactories;
+ protected final int fieldCount;
+
+ protected FileReference file;
+ protected int fileId = -1;
+
+ private boolean isActivated = false;
+
+ public AbstractTreeIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+ IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
+ FileReference file) {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.freePageManager = freePageManager;
+ this.interiorFrameFactory = interiorFrameFactory;
+ this.leafFrameFactory = leafFrameFactory;
+ this.cmpFactories = cmpFactories;
+ this.fieldCount = fieldCount;
+ this.file = file;
+ }
+
+ public synchronized void create() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to create the index since it is activated.");
+ }
+
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+
+ freePageManager.open(fileId);
+ initEmptyTree();
+ freePageManager.close();
+ bufferCache.closeFile(fileId);
+ }
+
+ private void initEmptyTree() throws HyracksDataException {
+ ITreeIndexFrame frame = leafFrameFactory.createFrame();
+ ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+ freePageManager.init(metaFrame, rootPage);
+
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+ rootNode.acquireWriteLatch();
+ try {
+ frame.setPage(rootNode);
+ frame.initBuffer((byte) 0);
+ } finally {
+ rootNode.releaseWriteLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public synchronized void activate() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to activate the index since it is already activated.");
+ }
+
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+ freePageManager.open(fileId);
+
+ // TODO: Should probably have some way to check that the tree is physically consistent
+ // or that the file we just opened actually is a tree
+
+ isActivated = true;
+ }
+
+ public synchronized void deactivate() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
+ }
+
+ bufferCache.closeFile(fileId);
+ freePageManager.close();
+
+ isActivated = false;
+ }
+
+ public synchronized void destroy() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to destroy the index since it is activated.");
+ }
+
+ file.delete();
+ if (fileId == -1) {
+ return;
+ }
+
+ bufferCache.deleteFile(fileId, false);
+ fileId = -1;
+ }
+
+ public synchronized void clear() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to clear the index since it is not activated.");
+ }
+ initEmptyTree();
+ }
+
+ public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+ rootNode.acquireReadLatch();
+ try {
+ frame.setPage(rootNode);
+ if (frame.getLevel() == 0 && frame.getTupleCount() == 0) {
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ rootNode.releaseReadLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+ rootNode.acquireReadLatch();
+ try {
+ frame.setPage(rootNode);
+ return frame.getLevel();
+ } finally {
+ rootNode.releaseReadLatch();
+ bufferCache.unpin(rootNode);
+ }
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public ITreeIndexFrameFactory getInteriorFrameFactory() {
+ return interiorFrameFactory;
+ }
+
+ public ITreeIndexFrameFactory getLeafFrameFactory() {
+ return leafFrameFactory;
+ }
+
+ public IBinaryComparatorFactory[] getComparatorFactories() {
+ return cmpFactories;
+ }
+
+ public IFreePageManager getFreePageManager() {
+ return freePageManager;
+ }
+
+ public int getRootPageId() {
+ return rootPage;
+ }
+
+ public int getFieldCount() {
+ return fieldCount;
+ }
+
+ public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+ protected final MultiComparator cmp;
+ protected final int slotSize;
+ protected final int leafMaxBytes;
+ protected final int interiorMaxBytes;
+ protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>();
+ protected final ITreeIndexMetaDataFrame metaFrame;
+ protected final ITreeIndexTupleWriter tupleWriter;
+ protected ITreeIndexFrame leafFrame;
+ protected ITreeIndexFrame interiorFrame;
+ private boolean releasedLatches;
+
+ public AbstractTreeIndexBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
+ leafFrame = leafFrameFactory.createFrame();
+ interiorFrame = interiorFrameFactory.createFrame();
+ metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+
+ if (!isEmptyTree(leafFrame)) {
+ throw new TreeIndexException("Cannot bulk-load a non-empty tree.");
+ }
+
+ this.cmp = MultiComparator.createIgnoreFieldLength(cmpFactories);
+
+ leafFrame.setMultiComparator(cmp);
+ interiorFrame.setMultiComparator(cmp);
+
+ tupleWriter = leafFrame.getTupleWriter();
+
+ NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
+ leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
+ leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId), true);
+ leafFrontier.page.acquireWriteLatch();
+
+ interiorFrame.setPage(leafFrontier.page);
+ interiorFrame.initBuffer((byte) 0);
+ interiorMaxBytes = (int) ((float) interiorFrame.getBuffer().capacity() * fillFactor);
+
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ leafMaxBytes = (int) ((float) leafFrame.getBuffer().capacity() * fillFactor);
+ slotSize = leafFrame.getSlotSize();
+
+ nodeFrontiers.add(leafFrontier);
+ }
+
+ public abstract void add(ITupleReference tuple) throws IndexException, HyracksDataException;
+
+ protected void handleException() throws HyracksDataException {
+ // Unlatch and unpin pages.
+ for (NodeFrontier nodeFrontier : nodeFrontiers) {
+ nodeFrontier.page.releaseWriteLatch();
+ bufferCache.unpin(nodeFrontier.page);
+ }
+ releasedLatches = true;
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ // copy the root generated from the bulk-load to *the* root page location
+ ICachedPage newRoot = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+ newRoot.acquireWriteLatch();
+ NodeFrontier lastNodeFrontier = nodeFrontiers.get(nodeFrontiers.size() - 1);
+ try {
+ System.arraycopy(lastNodeFrontier.page.getBuffer().array(), 0, newRoot.getBuffer().array(), 0,
+ lastNodeFrontier.page.getBuffer().capacity());
+ } finally {
+ newRoot.releaseWriteLatch();
+ bufferCache.unpin(newRoot);
+
+ // register old root as a free page
+ freePageManager.addFreePage(metaFrame, lastNodeFrontier.pageId);
+
+ if (!releasedLatches) {
+ for (int i = 0; i < nodeFrontiers.size(); i++) {
+ nodeFrontiers.get(i).page.releaseWriteLatch();
+ bufferCache.unpin(nodeFrontiers.get(i).page);
+ }
+ }
+ }
+ }
+
+ protected void addLevel() throws HyracksDataException {
+ NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
+ frontier.pageId = freePageManager.getFreePage(metaFrame);
+ frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
+ frontier.page.acquireWriteLatch();
+ frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) nodeFrontiers.size());
+ nodeFrontiers.add(frontier);
+ }
+ }
+
+ public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
+ ITreeIndexAccessor accessor;
+
+ public TreeIndexInsertBulkLoader() throws HyracksDataException {
+ accessor = (ITreeIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ accessor.insert(tuple);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ // do nothing
+ }
+
+ }
+
+ @Override
+ public long getMemoryAllocationSize() {
+ return 0;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 3557d73..59af99e 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
@@ -46,6 +45,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -279,7 +279,7 @@
memCursor.next();
LSMBTreeTupleReference lsmbtreeTuple = (LSMBTreeTupleReference) memCursor.getTuple();
if (!lsmbtreeTuple.isAntimatter()) {
- throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+ throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
} else {
memCursor.close();
ctx.memBTreeAccessor.upsertIfConditionElseInsert(tuple, AntimatterAwareTupleAcceptor.INSTANCE);
@@ -299,7 +299,7 @@
search(ctx, searchCursor, predicate);
try {
if (searchCursor.hasNext()) {
- throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
+ throw new TreeIndexDuplicateKeyException("Failed to insert key since key already exists.");
}
} finally {
searchCursor.close();
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 3eae5a7..244dfa0 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -43,6 +42,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -305,7 +305,7 @@
ctx.keysOnlyTuple.reset(tuple);
try {
ctx.deletedKeysBTreeAccessor.insert(ctx.keysOnlyTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Key has already been deleted.
}
break;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 222f4de..c70c2d5 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -19,9 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
@@ -33,6 +31,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
@@ -117,7 +117,7 @@
ITupleReference insertTuple = ctx.tupleIter.getTuple();
try {
btreeAccessor.insert(insertTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// This exception may be caused by duplicate tokens in the same insert "document".
// We ignore such duplicate tokens in all inverted-index implementations, hence
// we can safely ignore this exception.
@@ -134,7 +134,7 @@
ITupleReference deleteTuple = ctx.tupleIter.getTuple();
try {
btreeAccessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore this exception, since a document may have duplicate tokens.
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index e3e690b..285010c 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -23,8 +23,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -36,6 +34,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IVirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -306,7 +306,7 @@
if (foundTupleInMemoryBTree) {
try {
ctx.memBTreeAccessor.delete(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Tuple has been deleted in the meantime. Do nothing.
// This normally shouldn't happen if we are dealing with
// good citizens since LSMRTree is used as a secondary
@@ -320,7 +320,7 @@
} else {
try {
ctx.memBTreeAccessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Do nothing, because one delete tuple is enough to indicate
// that all the corresponding insert tuples are deleted
}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index eafeff2..0a70185 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.api.UnsortedInputException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -140,9 +141,9 @@
/**
* This test the btree page split. Originally this test didn't pass since
- * the btree was spliting by cardinality and not size. Thus, we might end
- * up with a situation where there is not enough space to insert the new
- * tuple after the split which will throw an error and the split won't be
+ * the btree was spliting by cardinality and not size. Thus, we might end up
+ * with a situation where there is not enough space to insert the new tuple
+ * after the split which will throw an error and the split won't be
* propagated to upper level; thus, the tree is corrupted. Now, it split
* page by size. The correct behavior on abnormally large keys/values.
*/
@@ -716,6 +717,12 @@
}
// Success.
break;
+ } catch (TreeIndexDuplicateKeyException e2) {
+ if (j != i) {
+ fail("Unexpected exception: " + e2.getMessage());
+ }
+ // Success.
+ break;
}
}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
index 5b7b050..9de217b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
@@ -35,7 +35,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
import edu.uci.ics.hyracks.storage.am.common.CheckTuple;
@@ -44,6 +43,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@SuppressWarnings("rawtypes")
@@ -205,7 +205,7 @@
// Set expected values. Do this only after insertion succeeds
// because we ignore duplicate keys.
ctx.insertCheckTuple(createStringCheckTuple(fieldValues, ctx.getKeyFieldCount()), ctx.getCheckTuples());
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate key insertions.
}
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
index 262e21c..22d3e6a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/multithread/BTreeTestWorker.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -31,6 +29,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
public class BTreeTestWorker extends AbstractIndexTestWorker {
@@ -60,7 +60,7 @@
case INSERT:
try {
accessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate keys, since we get random tuples.
}
break;
@@ -74,7 +74,7 @@
deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
try {
accessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
}
break;
@@ -82,7 +82,7 @@
case UPDATE:
try {
accessor.update(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
} catch (BTreeNotUpdateableException e) {
// Ignore not updateable exception due to numKeys == numFields.
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 8a37b4e..4a96131 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -19,8 +19,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNotUpdateableException;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.AbstractIndexTestWorker;
@@ -30,6 +28,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree.LSMBTreeAccessor;
@@ -60,7 +60,7 @@
case INSERT:
try {
accessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexDuplicateKeyException e) {
// Ignore duplicate keys, since we get random tuples.
}
break;
@@ -74,7 +74,7 @@
deleteTuple.reset(deleteTb.getFieldEndOffsets(), deleteTb.getByteArray());
try {
accessor.delete(deleteTuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
}
break;
@@ -82,7 +82,7 @@
case UPDATE:
try {
accessor.update(tuple);
- } catch (BTreeNonExistentKeyException e) {
+ } catch (TreeIndexNonExistentKeyException e) {
// Ignore non-existant keys, since we get random tuples.
} catch (BTreeNotUpdateableException e) {
// Ignore not updateable exception due to numKeys == numFields.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index c52130d..26cb8d0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -35,7 +36,6 @@
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
@@ -270,7 +270,7 @@
delegate.setVertex(this);
}
destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
+ long edgeMapSize = WritableUtils.readVLong(in);
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = allocateEdge();
edge.setConf(getContext().getConfiguration());
@@ -278,7 +278,7 @@
addEdge(edge);
}
msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
+ long msgListSize = WritableUtils.readVLong(in);
for (long i = 0; i < msgListSize; ++i) {
M msg = allocateMessage();
msg.readFields(in);
@@ -297,11 +297,11 @@
if (vertexValue != null) {
vertexValue.write(out);
}
- SerDeUtils.writeVLong(out, destEdgeList.size());
+ WritableUtils.writeVLong(out, destEdgeList.size());
for (Edge<I, E> edge : destEdgeList) {
edge.write(out);
}
- SerDeUtils.writeVLong(out, msgList.size());
+ WritableUtils.writeVLong(out, msgList.size());
for (M msg : msgList) {
msg.write(out);
}
@@ -618,7 +618,6 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
- *
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
index 25b07ff..a4336a3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -40,47 +40,4 @@
object.readFields(input);
}
- public static long readVLong(DataInput in) throws IOException {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) in.readByte();
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- }
- return value;
- }
-
- public static void writeVLong(DataOutput out, long value) throws IOException {
- long data = value;
- do {
- byte b = (byte) (data & 0x7f);
- data >>= 7;
- if (data != 0) {
- b |= 0x80;
- }
- out.write(b);
- } while (data != 0);
- }
-
- public static long readVLong(byte[] data, int start, int length) {
- int vLen = 0;
- long value = 0L;
- while (true) {
- byte b = (byte) data[start];
- ++vLen;
- value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- break;
- }
- ++start;
- }
- if (vLen != length)
- throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
- return value;
- }
-
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 2d4064b..5325397 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -204,6 +204,7 @@
private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
+ job.setMaxReattempts(0);
JobId jobId = hcc.startJob(deploymentId, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 478ac07..42359eb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -241,7 +241,7 @@
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false,
+ sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 49309ec..db18e53 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.TreeMap;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -197,7 +198,8 @@
public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
+ ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping = new HashMap<String, List<String>>();
int i = 0;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 150bd8b..6386bdb 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -17,19 +17,15 @@
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public class TupleDeserializer {
- private static final Logger LOGGER = Logger.getLogger(TupleDeserializer.class.getName());
-
+ private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation";
private Object[] record;
private RecordDescriptor recordDescriptor;
private ResetableByteArrayInputStream bbis;
@@ -43,132 +39,116 @@
}
public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
- for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbis.setByteArray(data, offset);
+ try {
+ for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ int len = tupleRef.getFieldLength(i);
+ bbis.setByteArray(data, offset);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
}
+
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
throws HyracksDataException {
- byte[] data = left.getBuffer().array();
- int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
- int leftFieldCount = left.getFieldCount();
- int fStart = tStart;
- for (int i = 0; i < leftFieldCount; ++i) {
- /**
- * reset the input
- */
- fStart = tStart + left.getFieldStartOffset(tIndex, i);
- bbis.setByteArray(data, fStart);
+ try {
+ byte[] data = left.getBuffer().array();
+ int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+ int leftFieldCount = left.getFieldCount();
+ int fStart = tStart;
+ for (int i = 0; i < leftFieldCount; ++i) {
+ /**
+ * reset the input
+ */
+ fStart = tStart + left.getFieldStartOffset(tIndex, i);
+ int fieldLength = left.getFieldLength(tIndex, i);
+ bbis.setByteArray(data, fStart);
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- for (int i = leftFieldCount; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - leftFieldCount);
- int rightOffset = right.getFieldStart(i - leftFieldCount);
- bbis.setByteArray(rightData, rightOffset);
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
}
+ record[i] = instance;
}
+ for (int i = leftFieldCount; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - leftFieldCount);
+ int rightOffset = right.getFieldStart(i - leftFieldCount);
+ int len = right.getFieldLength(i - leftFieldCount);
+ bbis.setByteArray(rightData, rightOffset);
+
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > len) {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ record[i] = instance;
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
- byte[] data = tb.getByteArray();
- int[] offset = tb.getFieldEndOffsets();
- int start = 0;
- for (int i = 0; i < offset.length; ++i) {
- /**
- * reset the input
- */
- bbis.setByteArray(data, start);
- start = offset[i];
+ try {
+ byte[] data = tb.getByteArray();
+ int[] offset = tb.getFieldEndOffsets();
+ int start = 0;
+ for (int i = 0; i < offset.length; ++i) {
+ /**
+ * reset the input
+ */
+ bbis.setByteArray(data, start);
+ start = offset[i];
+ int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
- /**
- * do deserialization
- */
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ /**
+ * do deserialization
+ */
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
- }
- for (int i = offset.length; i < record.length; ++i) {
- byte[] rightData = right.getFieldData(i - offset.length);
- int rightOffset = right.getFieldStart(i - offset.length);
- bbis.setByteArray(rightData, rightOffset);
+ for (int i = offset.length; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - offset.length);
+ int rightOffset = right.getFieldStart(i - offset.length);
+ bbis.setByteArray(rightData, rightOffset);
+ int fieldLength = right.getFieldLength(i - offset.length);
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(i + " " + instance);
- }
- record[i] = instance;
- if (FrameConstants.DEBUG_FRAME_IO) {
- try {
- if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
- throw new HyracksDataException("Field magic mismatch");
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int availableBefore = bbis.available();
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ int availableAfter = bbis.available();
+ if (availableBefore - availableAfter > fieldLength) {
+ throw new IllegalStateException(ERROR_MSG);
}
+ record[i] = instance;
}
+ return record;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- return record;
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index ea1e02e..4421695 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
/**
* The buffer to hold updates.
@@ -96,7 +97,12 @@
fta.reset(buffer);
for (int j = 0; j < fta.getTupleCount(); j++) {
tuple.reset(fta, j);
- bta.update(tuple);
+ try {
+ bta.update(tuple);
+ } catch (TreeIndexNonExistentKeyException e) {
+ // ignore non-existent key exception
+ bta.insert(tuple);
+ }
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
index 7d824ea..90065c2 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/data/VLongNormalizedKeyComputer.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.pregelix.example.data;
+import java.io.DataInput;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.WritableUtils;
+
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* @author yingyib
@@ -26,34 +31,42 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
@Override
public int getNormalizedKey(byte[] bytes, int start, int length) {
- long value = SerDeUtils.readVLong(bytes, start, length);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0; TODO: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
+ try {
+ bis.setByteArray(bytes, start);
+ long value = WritableUtils.readVLong(dis);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ec1109f..1c5f629 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,14 +16,14 @@
package edu.uci.ics.pregelix.example.io;
import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.DataInputStream;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
import edu.uci.ics.pregelix.api.io.WritableSizable;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
/**
* A WritableComparable for longs in a variable-length format. Such values take
@@ -32,17 +32,7 @@
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable, WritableSizable {
- private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
- private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
- private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
- private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
- private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
- private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
- private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
- private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
-
- private long value;
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
public VLongWritable() {
}
@@ -52,78 +42,46 @@
}
public int sizeInBytes() {
- if (value >= 0 && value <= ONE_BYTE_MAX) {
+ long i = get();
+ if (i >= -112 && i <= 127) {
return 1;
- } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
- return 2;
- } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
- return 3;
- } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
- return 4;
- } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
- return 5;
- } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
- return 6;
- } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
- return 7;
- } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
- return 8;
- } else {
- return 9;
}
- }
- /** Set the value of this LongWritable. */
- public void set(long value) {
- this.value = value;
- }
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
- /** Return the value of this LongWritable. */
- public long get() {
- return value;
- }
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
- public void readFields(DataInput in) throws IOException {
- value = SerDeUtils.readVLong(in);
- }
-
- public void write(DataOutput out) throws IOException {
- SerDeUtils.writeVLong(out, value);
- }
-
- /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
- public boolean equals(Object o) {
- if (!(o instanceof VLongWritable))
- return false;
- VLongWritable other = (VLongWritable) o;
- return this.value == other.value;
- }
-
- public int hashCode() {
- return (int) value;
- }
-
- /** Compares two VLongWritables. */
- public int compareTo(Object o) {
- long thisValue = this.value;
- long thatValue = ((VLongWritable) o).value;
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
- }
-
- public String toString() {
- return Long.toString(value);
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ return len + 1;
}
/** A Comparator optimized for LongWritable. */
public static class Comparator extends WritableComparator {
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+
public Comparator() {
super(VLongWritable.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- long thisValue = SerDeUtils.readVLong(b1, s1, l1);
- long thatValue = SerDeUtils.readVLong(b2, s2, l2);
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ try {
+ bis.setByteArray(b1, s1);
+ long thisValue = WritableUtils.readVLong(dis);
+ bis.setByteArray(b2, s2);
+ long thatValue = WritableUtils.readVLong(dis);
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 196b114..638011b 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -41,10 +41,13 @@
public void testVLong() throws Exception {
Random rand = new Random(System.currentTimeMillis());
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ msgList.add(new VLongWritable(Long.MAX_VALUE));
+ msgList.add(new VLongWritable(Long.MIN_VALUE));
+ msgList.add(new VLongWritable(-1));
for (int i = 0; i < 1000000; i++) {
msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
}
- verifySizeEstimation(msgList);
+ verifyExactSizeEstimation(msgList);
}
@Test
@@ -96,7 +99,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testNull() throws Exception {
MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
@@ -105,7 +108,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testVInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -115,7 +118,7 @@
}
verifySizeEstimation(msgList);
}
-
+
@Test
public void testInt() throws Exception {
Random rand = new Random(System.currentTimeMillis());
@@ -148,4 +151,26 @@
}
}
+ private void verifyExactSizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ int accumulatedSize = 5;
+ for (int i = 0; i < msgList.size(); i++) {
+ bos.reset();
+ WritableSizable value = msgList.get(i);
+ value.write(dos);
+ if (value.sizeInBytes() != bos.size()) {
+ throw new Exception(value + " estimated size (" + value.sizeInBytes()
+ + ") is smaller than the actual size" + bos.size());
+ }
+ accumulatedSize += value.sizeInBytes();
+ }
+ bos.reset();
+ msgList.write(dos);
+ if (accumulatedSize < bos.size()) {
+ throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+ + bos.size());
+ }
+ }
+
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index d46457c..ab564fa 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -205,7 +205,11 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
- aggregator.step(vertex);
+
+ if (msgContentList.segmentEnd()) {
+ /** the if condition makes sure aggregate only calls once per-vertex */
+ aggregator.step(vertex);
+ }
}
@Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3d52a45..acd766e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -180,7 +180,7 @@
throw new HyracksDataException(e);
}
}
- return size;
+ return size * 2;
}
private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
@@ -224,4 +224,4 @@
};
}
-}
\ No newline at end of file
+}