Merged -r 330:354 from trunk
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@355 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/pom.xml b/hyracks-storage-am-btree/pom.xml
index 9a0e810..68c77e2 100644
--- a/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-storage-am-btree/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
</parent>
<build>
@@ -27,28 +27,28 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-common</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.1.4-SNAPSHOT</version>
+ <version>0.1.5-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 1dd69ba..6e13ed7 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -26,56 +26,70 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-public class BTreeBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
- private float fillFactor;
- private final BTreeOpHelper btreeOpHelper;
- private FrameTupleAccessor accessor;
- private BTree.BulkLoadContext bulkLoadCtx;
+public class BTreeBulkLoadOperatorNodePushable extends
+ AbstractUnaryInputSinkOperatorNodePushable {
+ private float fillFactor;
+ private final BTreeOpHelper btreeOpHelper;
+ private FrameTupleAccessor accessor;
+ private BTree.BulkLoadContext bulkLoadCtx;
- private IRecordDescriptorProvider recordDescProvider;
+ private IRecordDescriptorProvider recordDescProvider;
- private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+ private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
- public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksStageletContext ctx,
- int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.CREATE_BTREE);
- this.fillFactor = fillFactor;
- this.recordDescProvider = recordDescProvider;
- tuple.setFieldPermutation(fieldPermutation);
- }
+ public BTreeBulkLoadOperatorNodePushable(
+ AbstractBTreeOperatorDescriptor opDesc,
+ IHyracksStageletContext ctx, int partition, int[] fieldPermutation,
+ float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition,
+ BTreeOpHelper.BTreeMode.CREATE_BTREE);
+ this.fillFactor = fillFactor;
+ this.recordDescProvider = recordDescProvider;
+ tuple.setFieldPermutation(fieldPermutation);
+ }
- @Override
- public void open() throws HyracksDataException {
- AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
- accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
- IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
- btreeOpHelper.init();
- btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
- bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor, btreeOpHelper.getLeafFrame(),
- btreeOpHelper.getInteriorFrame(), metaFrame);
- }
+ @Override
+ public void open() throws HyracksDataException {
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper
+ .getOperatorDescriptor();
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(
+ opDesc.getOperatorId(), 0);
+ accessor = new FrameTupleAccessor(btreeOpHelper
+ .getHyracksStageletContext().getFrameSize(), recDesc);
+ IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
+ bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor,
+ btreeOpHelper.getLeafFrame(),
+ btreeOpHelper.getInteriorFrame(), metaFrame);
+ } catch (Exception e) {
+ // cleanup in case of failure
+ btreeOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- btreeOpHelper.getBTree().bulkLoadAddTuple(bulkLoadCtx, tuple);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ btreeOpHelper.getBTree().bulkLoadAddTuple(bulkLoadCtx, tuple);
+ }
+ }
- @Override
- public void close() throws HyracksDataException {
- try {
- btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
- } finally {
- btreeOpHelper.deinit();
- }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
+ } finally {
+ btreeOpHelper.deinit();
+ }
+ }
- @Override
- public void flush() throws HyracksDataException {
- }
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index fe71c33..c362183 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -45,50 +45,56 @@
DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
- btreeOpHelper.init();
- btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
-
- MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
- ByteBuffer frame = btreeOpHelper.getHyracksStageletContext().allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
- appender.reset(frame, true);
- ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
- DataOutput dos = tb.getDataOutput();
-
try {
- while (cursor.hasNext()) {
- tb.reset();
- cursor.next();
+
+ btreeOpHelper.init();
+
+ try {
+ btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
- ITupleReference frameTuple = cursor.getTuple();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
+ MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
+ ByteBuffer frame = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
+ DataOutput dos = tb.getDataOutput();
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
+ ITupleReference frameTuple = cursor.getTuple();
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
- cursor.close();
- writer.close();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ }
+ finally {
+ cursor.close();
+ writer.close();
+ }
+
+ } catch(Exception e) {
+ deinitialize();
+ throw new HyracksDataException(e);
}
}
@Override
public void deinitialize() throws HyracksDataException {
- btreeOpHelper.deinit();
+ btreeOpHelper.deinit();
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
index 23a6601..c087fdd 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
@@ -15,6 +15,9 @@
package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,7 +30,9 @@
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class BTreeDropOperatorNodePushable extends AbstractOperatorNodePushable {
- private final IHyracksStageletContext ctx;
+ private static final Logger LOGGER = Logger.getLogger(BTreeDropOperatorNodePushable.class.getName());
+
+ private final IHyracksStageletContext ctx;
private IBTreeRegistryProvider btreeRegistryProvider;
private IStorageManagerInterface storageManager;
private IFileSplitProvider fileSplitProvider;
@@ -58,30 +63,39 @@
@Override
public void initialize() throws HyracksDataException {
+ try {
- BTreeRegistry btreeRegistry = btreeRegistryProvider.getBTreeRegistry(ctx);
- IBufferCache bufferCache = storageManager.getBufferCache(ctx);
- IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
+ BTreeRegistry btreeRegistry = btreeRegistryProvider.getBTreeRegistry(ctx);
+ IBufferCache bufferCache = storageManager.getBufferCache(ctx);
+ IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
- FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
+ FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- boolean fileIsMapped = fileMapProvider.isMapped(f);
- if (!fileIsMapped) {
- throw new HyracksDataException("Cannot drop B-Tree with name " + f.toString() + ". No file mapping exists.");
- }
+ boolean fileIsMapped = fileMapProvider.isMapped(f);
+ if (!fileIsMapped) {
+ throw new HyracksDataException("Cannot drop B-Tree with name " + f.toString() + ". No file mapping exists.");
+ }
- int btreeFileId = fileMapProvider.lookupFileId(f);
+ int btreeFileId = fileMapProvider.lookupFileId(f);
- // unregister btree instance
- btreeRegistry.lock();
- try {
- btreeRegistry.unregister(btreeFileId);
- } finally {
- btreeRegistry.unlock();
- }
+ // unregister btree instance
+ btreeRegistry.lock();
+ try {
+ btreeRegistry.unregister(btreeFileId);
+ } finally {
+ btreeRegistry.unlock();
+ }
- // remove name to id mapping
- bufferCache.deleteFile(btreeFileId);
+ // remove name to id mapping
+ bufferCache.deleteFile(btreeFileId);
+ }
+ // TODO: for the time being we don't throw,
+ // with proper exception handling (no hanging job problem) we should throw
+ catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("BTRee Drop Operator Failed Due To Exception: " + e.getMessage());
+ }
+ }
}
@Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
index 7557573..023cd40 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
@@ -46,8 +46,12 @@
@Override
public void initialize() throws HyracksDataException {
- btreeOpHelper.init();
- btreeOpHelper.deinit();
+ try {
+ btreeOpHelper.init();
+ }
+ finally {
+ btreeOpHelper.deinit();
+ }
}
@Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index e6ba012..acb7d0f 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -48,14 +48,20 @@
@Override
public void open() throws HyracksDataException {
- AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
- RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
- accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
- writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
- btreeOpHelper.init();
- btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
- opCtx = btreeOpHelper.getBTree().createOpContext(op, btreeOpHelper.getLeafFrame(),
- btreeOpHelper.getInteriorFrame(), new MetaDataFrame());
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+ RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
+ writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+ try {
+ btreeOpHelper.init();
+ btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
+ opCtx = btreeOpHelper.getBTree().createOpContext(op, btreeOpHelper.getLeafFrame(),
+ btreeOpHelper.getInteriorFrame(), new MetaDataFrame());
+ } catch(Exception e) {
+ // cleanup in case of failure
+ btreeOpHelper.deinit();
+ throw new HyracksDataException(e);
+ }
}
@Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
index d01ceee..880cc25 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -29,23 +29,21 @@
final class BTreeOpHelper {
- public enum BTreeMode {
- OPEN_BTREE,
- CREATE_BTREE,
- ENLIST_BTREE
- }
+ public enum BTreeMode {
+ OPEN_BTREE, CREATE_BTREE, ENLIST_BTREE
+ }
+
+ private IBTreeInteriorFrame interiorFrame;
+ private IBTreeLeafFrame leafFrame;
- private IBTreeInteriorFrame interiorFrame;
- private IBTreeLeafFrame leafFrame;
-
- private BTree btree;
- private int btreeFileId = -1;
- private int partition;
+ private BTree btree;
+ private int btreeFileId = -1;
+ private int partition;
private AbstractBTreeOperatorDescriptor opDesc;
private IHyracksStageletContext ctx;
- private BTreeMode mode;
+ private BTreeMode mode;
BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksStageletContext ctx, int partition,
BTreeMode mode) {
@@ -55,7 +53,7 @@
this.partition = partition;
}
- void init() throws HyracksDataException {
+ void init() throws HyracksDataException {
IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
@@ -63,70 +61,72 @@
FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
boolean fileIsMapped = fileMapProvider.isMapped(f);
- switch (mode) {
+ switch (mode) {
+
+ case OPEN_BTREE: {
+ if (!fileIsMapped) {
+ throw new HyracksDataException(
+ "Trying to open btree from unmapped file " + f.toString());
+ }
+ }
+ break;
- case OPEN_BTREE: {
- if (!fileIsMapped) {
- bufferCache.createFile(f);
- // throw new
- // HyracksDataException("Trying to open btree from unmapped file "
- // + fileName);
- }
- }
- break;
-
- case CREATE_BTREE:
- case ENLIST_BTREE: {
- if (!fileIsMapped) {
- bufferCache.createFile(f);
- }
- }
- break;
-
- }
+ case CREATE_BTREE:
+ case ENLIST_BTREE: {
+ if (!fileIsMapped) {
+ bufferCache.createFile(f);
+ }
+ }
+ break;
+
+ }
btreeFileId = fileMapProvider.lookupFileId(f);
bufferCache.openFile(btreeFileId);
- interiorFrame = opDesc.getInteriorFactory().getFrame();
- leafFrame = opDesc.getLeafFactory().getFrame();
+ interiorFrame = opDesc.getInteriorFactory().getFrame();
+ leafFrame = opDesc.getLeafFactory().getFrame();
BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry(ctx);
btree = btreeRegistry.get(btreeFileId);
if (btree == null) {
- // create new btree and register it
- btreeRegistry.lock();
- try {
- // check if btree has already been registered by another thread
- btree = btreeRegistry.get(btreeFileId);
- if (btree == null) {
- // this thread should create and register the btree
+ // create new btree and register it
+ btreeRegistry.lock();
+ try {
+ // check if btree has already been registered by another thread
+ btree = btreeRegistry.get(btreeFileId);
+ if (btree == null) {
+ // this thread should create and register the btree
- IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
- for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
- comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
- }
+ IBinaryComparator[] comparators = new IBinaryComparator[opDesc
+ .getComparatorFactories().length];
+ for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
+ comparators[i] = opDesc.getComparatorFactories()[i]
+ .createBinaryComparator();
+ }
- MultiComparator cmp = new MultiComparator(opDesc.getTypeTraits(), comparators);
+ MultiComparator cmp = new MultiComparator(opDesc
+ .getTypeTraits(), comparators);
- btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
- if (mode == BTreeMode.CREATE_BTREE) {
- MetaDataFrame metaFrame = new MetaDataFrame();
- try {
- btree.create(btreeFileId, leafFrame, metaFrame);
- btree.open(btreeFileId);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- btreeRegistry.register(btreeFileId, btree);
- }
- } finally {
- btreeRegistry.unlock();
- }
- }
- }
+ btree = new BTree(bufferCache, opDesc.getInteriorFactory(),
+ opDesc.getLeafFactory(), cmp);
+ if (mode == BTreeMode.CREATE_BTREE) {
+ MetaDataFrame metaFrame = new MetaDataFrame();
+ try {
+ btree.create(btreeFileId, leafFrame, metaFrame);
+ btree.open(btreeFileId);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ btreeRegistry.register(btreeFileId, btree);
+ }
+ } finally {
+ btreeRegistry.unlock();
+ }
+ }
+ }
public void deinit() throws HyracksDataException {
if (btreeFileId != -1) {
@@ -135,27 +135,27 @@
}
}
- public BTree getBTree() {
- return btree;
- }
+ public BTree getBTree() {
+ return btree;
+ }
public IHyracksStageletContext getHyracksStageletContext() {
return ctx;
}
- public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
- return opDesc;
- }
+ public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
+ return opDesc;
+ }
- public IBTreeLeafFrame getLeafFrame() {
- return leafFrame;
- }
+ public IBTreeLeafFrame getLeafFrame() {
+ return leafFrame;
+ }
- public IBTreeInteriorFrame getInteriorFrame() {
- return interiorFrame;
- }
+ public IBTreeInteriorFrame getInteriorFrame() {
+ return interiorFrame;
+ }
- public int getBTreeFileId() {
- return btreeFileId;
- }
+ public int getBTreeFileId() {
+ return btreeFileId;
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index eeee17d..415f169 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -37,165 +37,194 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
-public class BTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private BTreeOpHelper btreeOpHelper;
- private FrameTupleAccessor accessor;
+public class BTreeSearchOperatorNodePushable extends
+ AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private BTreeOpHelper btreeOpHelper;
+ private FrameTupleAccessor accessor;
- private ByteBuffer writeBuffer;
- private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
+ private ByteBuffer writeBuffer;
+ private FrameTupleAppender appender;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
- private BTree btree;
- private boolean isForward;
- private PermutingFrameTupleReference lowKey;
- private PermutingFrameTupleReference highKey;
- private boolean lowKeyInclusive;
- private boolean highKeyInclusive;
- private RangePredicate rangePred;
- private MultiComparator lowKeySearchCmp;
- private MultiComparator highKeySearchCmp;
- private IBTreeCursor cursor;
- private IBTreeLeafFrame cursorFrame;
- private BTreeOpContext opCtx;
+ private BTree btree;
+ private boolean isForward;
+ private PermutingFrameTupleReference lowKey;
+ private PermutingFrameTupleReference highKey;
+ private boolean lowKeyInclusive;
+ private boolean highKeyInclusive;
+ private RangePredicate rangePred;
+ private MultiComparator lowKeySearchCmp;
+ private MultiComparator highKeySearchCmp;
+ private IBTreeCursor cursor;
+ private IBTreeLeafFrame cursorFrame;
+ private BTreeOpContext opCtx;
- private RecordDescriptor recDesc;
+ private RecordDescriptor recDesc;
- public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksStageletContext ctx,
- int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
- this.isForward = isForward;
- this.lowKeyInclusive = lowKeyInclusive;
- this.highKeyInclusive = highKeyInclusive;
- this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
- if (lowKeyFields != null && lowKeyFields.length > 0) {
- lowKey = new PermutingFrameTupleReference();
- lowKey.setFieldPermutation(lowKeyFields);
- }
- if (highKeyFields != null && highKeyFields.length > 0) {
- highKey = new PermutingFrameTupleReference();
- highKey.setFieldPermutation(highKeyFields);
- }
- }
+ public BTreeSearchOperatorNodePushable(
+ AbstractBTreeOperatorDescriptor opDesc,
+ IHyracksStageletContext ctx, int partition,
+ IRecordDescriptorProvider recordDescProvider, boolean isForward,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition,
+ BTreeOpHelper.BTreeMode.OPEN_BTREE);
+ this.isForward = isForward;
+ this.lowKeyInclusive = lowKeyInclusive;
+ this.highKeyInclusive = highKeyInclusive;
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc
+ .getOperatorId(), 0);
+ if (lowKeyFields != null && lowKeyFields.length > 0) {
+ lowKey = new PermutingFrameTupleReference();
+ lowKey.setFieldPermutation(lowKeyFields);
+ }
+ if (highKeyFields != null && highKeyFields.length > 0) {
+ highKey = new PermutingFrameTupleReference();
+ highKey.setFieldPermutation(highKeyFields);
+ }
+ }
- @Override
- public void open() throws HyracksDataException {
- AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
- accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+ @Override
+ public void open() throws HyracksDataException {
+ AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper
+ .getOperatorDescriptor();
+ accessor = new FrameTupleAccessor(btreeOpHelper
+ .getHyracksStageletContext().getFrameSize(), recDesc);
- cursorFrame = opDesc.getLeafFactory().getFrame();
- cursor = new RangeSearchCursor(cursorFrame);
+ cursorFrame = opDesc.getLeafFactory().getFrame();
+ cursor = new RangeSearchCursor(cursorFrame);
- btreeOpHelper.init();
- btree = btreeOpHelper.getBTree();
+ try {
- // construct range predicate
+ btreeOpHelper.init();
+ btree = btreeOpHelper.getBTree();
- int lowKeySearchFields = btree.getMultiComparator().getComparators().length;
- int highKeySearchFields = btree.getMultiComparator().getComparators().length;
- if (lowKey != null)
- lowKeySearchFields = lowKey.getFieldCount();
- if (highKey != null)
- highKeySearchFields = highKey.getFieldCount();
+ // construct range predicate
- IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
- for (int i = 0; i < lowKeySearchFields; i++) {
- lowKeySearchComparators[i] = btree.getMultiComparator().getComparators()[i];
- }
- lowKeySearchCmp = new MultiComparator(btree.getMultiComparator().getTypeTraits(), lowKeySearchComparators);
+ int lowKeySearchFields = btree.getMultiComparator()
+ .getComparators().length;
+ int highKeySearchFields = btree.getMultiComparator()
+ .getComparators().length;
+ if (lowKey != null)
+ lowKeySearchFields = lowKey.getFieldCount();
+ if (highKey != null)
+ highKeySearchFields = highKey.getFieldCount();
- if (lowKeySearchFields == highKeySearchFields) {
- highKeySearchCmp = lowKeySearchCmp;
- } else {
- IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
- for (int i = 0; i < highKeySearchFields; i++) {
- highKeySearchComparators[i] = btree.getMultiComparator().getComparators()[i];
- }
- highKeySearchCmp = new MultiComparator(btree.getMultiComparator().getTypeTraits(), highKeySearchComparators);
+ IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+ for (int i = 0; i < lowKeySearchFields; i++) {
+ lowKeySearchComparators[i] = btree.getMultiComparator()
+ .getComparators()[i];
+ }
+ lowKeySearchCmp = new MultiComparator(btree.getMultiComparator()
+ .getTypeTraits(), lowKeySearchComparators);
- }
+ if (lowKeySearchFields == highKeySearchFields) {
+ highKeySearchCmp = lowKeySearchCmp;
+ } else {
+ IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+ for (int i = 0; i < highKeySearchFields; i++) {
+ highKeySearchComparators[i] = btree.getMultiComparator()
+ .getComparators()[i];
+ }
+ highKeySearchCmp = new MultiComparator(btree
+ .getMultiComparator().getTypeTraits(),
+ highKeySearchComparators);
- rangePred = new RangePredicate(isForward, null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
- highKeySearchCmp);
+ }
- accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+ rangePred = new RangePredicate(isForward, null, null,
+ lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+ highKeySearchCmp);
- writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getMultiComparator().getFieldCount());
- dos = tb.getDataOutput();
- appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
- appender.reset(writeBuffer, true);
+ accessor = new FrameTupleAccessor(btreeOpHelper
+ .getHyracksStageletContext().getFrameSize(), recDesc);
- opCtx = btree.createOpContext(BTreeOp.BTO_SEARCH, btreeOpHelper.getLeafFrame(),
- btreeOpHelper.getInteriorFrame(), null);
- }
+ writeBuffer = btreeOpHelper.getHyracksStageletContext()
+ .allocateFrame();
+ tb = new ArrayTupleBuilder(btree.getMultiComparator()
+ .getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(btreeOpHelper
+ .getHyracksStageletContext().getFrameSize());
+ appender.reset(writeBuffer, true);
- private void writeSearchResults() throws Exception {
- while (cursor.hasNext()) {
- tb.reset();
- cursor.next();
+ opCtx = btree.createOpContext(BTreeOp.BTO_SEARCH, btreeOpHelper
+ .getLeafFrame(), btreeOpHelper.getInteriorFrame(), null);
- ITupleReference frameTuple = cursor.getTuple();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
+ } catch (Exception e) {
+ btreeOpHelper.deinit();
+ }
+ }
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
- }
+ private void writeSearchResults() throws Exception {
+ while (cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
+ ITupleReference frameTuple = cursor.getTuple();
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple
+ .getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
- int tupleCount = accessor.getTupleCount();
- try {
- for (int i = 0; i < tupleCount; i++) {
- if (lowKey != null)
- lowKey.reset(accessor, i);
- if (highKey != null)
- highKey.reset(accessor, i);
- rangePred.setLowKey(lowKey, lowKeyInclusive);
- rangePred.setHighKey(highKey, highKeyInclusive);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(),
+ tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
- cursor.reset();
- btree.search(cursor, rangePred, opCtx);
- writeSearchResults();
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
- @Override
- public void close() throws HyracksDataException {
- try {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- try {
- cursor.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- } finally {
- btreeOpHelper.deinit();
- }
- }
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (lowKey != null)
+ lowKey.reset(accessor, i);
+ if (highKey != null)
+ highKey.reset(accessor, i);
+ rangePred.setLowKey(lowKey, lowKeyInclusive);
+ rangePred.setHighKey(highKey, highKeyInclusive);
- @Override
- public void flush() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- }
+ cursor.reset();
+ btree.search(cursor, rangePred, opCtx);
+ writeSearchResults();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ } finally {
+ btreeOpHelper.deinit();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
index 433cc6a..2ef1905 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
@@ -60,8 +60,10 @@
if (page != null) {
page.releaseReadLatch();
bufferCache.unpin(page);
- page = null;
}
+ tupleIndex = 0;
+ page = null;
+ pred = null;
}
public ITupleReference getTuple() {
@@ -217,9 +219,11 @@
@Override
public void reset() {
- tupleIndex = 0;
- page = null;
- pred = null;
+ try {
+ close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
@Override