added btree search and btree bulkload operators and made some interfaces serializable
git-svn-id: https://hyracks.googlecode.com/svn/trunk@100 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index 06cac47..5d91b5a 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -39,6 +39,13 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
index a9d7805..8383fed 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.btree.api;
-public interface IBTreeInteriorFrameFactory {
+import java.io.Serializable;
+
+public interface IBTreeInteriorFrameFactory extends Serializable {
public IBTreeInteriorFrame getFrame();
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
index 591f81d..0855a72 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.btree.api;
-public interface IBTreeLeafFrameFactory {
+import java.io.Serializable;
+
+public interface IBTreeLeafFrameFactory extends Serializable {
public IBTreeLeafFrame getFrame();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
index a076f0f..55a25ea 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
@@ -15,7 +15,9 @@
package edu.uci.ics.hyracks.storage.am.btree.api;
-public interface IFieldAccessor {
+import java.io.Serializable;
+
+public interface IFieldAccessor extends Serializable {
public int getLength(byte[] data, int offset); // skip to next field (equivalent to adding length of field to offset)
public String print(byte[] data, int offset); // debug
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
index e6c7509..334f2f3 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.btree.api;
-public interface ISearchPredicate {
+import java.io.Serializable;
+
+public interface ISearchPredicate extends Serializable {
public boolean isForward();
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
new file mode 100644
index 0000000..72a9f12
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+
+public abstract class AbstractBTreeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ protected String btreeFileName;
+ protected int btreeFileId;
+
+ protected MultiComparator cmp;
+ protected RangePredicate rangePred;
+
+ protected IBTreeInteriorFrameFactory interiorFrameFactory;
+ protected IBTreeLeafFrameFactory leafFrameFactory;
+
+ protected IBufferCacheProvider bufferCacheProvider;
+ protected IBTreeRegistryProvider btreeRegistryProvider;
+
+ public AbstractBTreeOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity, IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId, String btreeFileName, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, MultiComparator cmp, RangePredicate rangePred) {
+ super(spec, inputArity, outputArity);
+ this.cmp = cmp;
+ this.rangePred = rangePred;
+ this.btreeFileId = btreeFileId;
+ this.btreeFileName = btreeFileName;
+ this.bufferCacheProvider = bufferCacheProvider;
+ this.btreeRegistryProvider = btreeRegistryProvider;
+ this.interiorFrameFactory = interiorFactory;
+ this.leafFrameFactory = leafFactory;
+ if(outputArity > 0) recordDescriptors[0] = recDesc;
+ }
+
+ public String getBtreeFileName() {
+ return btreeFileName;
+ }
+
+ public int getBtreeFileId() {
+ return btreeFileId;
+ }
+
+ public MultiComparator getMultiComparator() {
+ return cmp;
+ }
+
+ public RangePredicate getRangePredicate() {
+ return rangePred;
+ }
+
+ public IBTreeInteriorFrameFactory getInteriorFactory() {
+ return interiorFrameFactory;
+ }
+
+ public IBTreeLeafFrameFactory getLeafFactory() {
+ return leafFrameFactory;
+ }
+
+ public IBufferCacheProvider getBufferCacheProvider() {
+ return bufferCacheProvider;
+ }
+
+ public IBTreeRegistryProvider getBtreeRegistryProvider() {
+ return btreeRegistryProvider;
+ }
+
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptors[0];
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
new file mode 100644
index 0000000..ef0ab4d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
@@ -0,0 +1,152 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public abstract class AbstractBTreeOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
+
+ protected IBTreeInteriorFrame interiorFrame;
+ protected IBTreeLeafFrame leafFrame;
+
+ protected BTree btree;
+
+ protected AbstractBTreeOperatorDescriptor opDesc;
+ protected IHyracksContext ctx;
+
+ public AbstractBTreeOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx) {
+ this.opDesc = opDesc;
+ this.ctx = ctx;
+ }
+
+ public void init() throws FileNotFoundException {
+ IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
+ FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
+
+ File f = new File(opDesc.getBtreeFileName());
+ RandomAccessFile raf = new RandomAccessFile(f, "rw");
+
+ try {
+ FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
+ fileManager.registerFile(fi);
+ }
+ catch (Exception e) {
+ }
+
+ BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
+ btree = btreeRegistry.get(opDesc.getBtreeFileId());
+ if(btree == null) {
+
+ // create new btree and register it
+ btreeRegistry.lock();
+ try {
+ // check if btree has already been registered by another thread
+ btree = btreeRegistry.get(opDesc.getBtreeFileId());
+ if(btree == null) {
+
+ // this thread should create and register the btee
+ btree = new BTree(bufferCache,
+ opDesc.getInteriorFactory(),
+ opDesc.getLeafFactory(),
+ opDesc.getMultiComparator());
+ btree.open(opDesc.getBtreeFileId());
+ btreeRegistry.register(opDesc.getBtreeFileId(), btree);
+ }
+ }
+ finally {
+ btreeRegistry.unlock();
+ }
+ }
+
+ interiorFrame = opDesc.getInteriorFactory().getFrame();
+ leafFrame = opDesc.getLeafFactory().getFrame();
+ }
+
+ // debug
+ protected void fill() throws Exception {
+ MetaDataFrame metaFrame = new MetaDataFrame();
+
+ btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+
+ Random rnd = new Random();
+ rnd.setSeed(50);
+
+ for (int i = 0; i < 10000; i++) {
+ ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream dos = new DataOutputStream(baaos);
+
+ int f0 = rnd.nextInt() % 10000;
+ int f1 = 5;
+
+ IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
+
+ byte[] record = baaos.toByteArray();
+
+ if (i % 1000 == 0) {
+ System.out.println("INSERTING " + i + " : " + f0 + " " + f1);
+ }
+
+ try {
+ btree.insert(record, leafFrame, interiorFrame, metaFrame);
+ } catch (Exception e) {
+ }
+ }
+
+ /*
+ IFieldAccessor[] fields = new IFieldAccessor[2];
+ fields[0] = new Int32Accessor(); // key field
+ fields[1] = new Int32Accessor(); // value field
+
+ int keyLen = 1;
+ IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+ cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator cmp = new MultiComparator(cmps, fields);
+
+ ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream lkdos = new DataOutputStream(lkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+ ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream hkdos = new DataOutputStream(hkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+ byte[] lowKey = lkbaaos.toByteArray();
+ byte[] highKey = hkbaaos.toByteArray();
+
+ IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+ searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+ RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+ btree.search(cursor, rangePred, leafFrame, interiorFrame);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ byte[] array = cursor.getPage().getBuffer().array();
+ int recOffset = cursor.getOffset();
+ String rec = cmp.printRecord(array, recOffset);
+ System.out.println(rec);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ cursor.close();
+ }
+ */
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..c559cf6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeBulkLoadOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] keyFields;
+ private final int[] payloadFields;
+
+ private float fillFactor;
+
+ public BTreeBulkLoadOperatorDescriptor(JobSpecification spec,
+ IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc,
+ IBufferCacheProvider bufferCacheProvider,
+ IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId,
+ String btreeFileName, IBTreeInteriorFrameFactory interiorFactory,
+ IBTreeLeafFrameFactory leafFactory, MultiComparator cmp,
+ int[] keyFields, int[] payloadFields, float fillFactor) {
+ super(spec, 1, 0, fileSplitProvider, recDesc, bufferCacheProvider,
+ btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory,
+ leafFactory, cmp, null);
+ this.keyFields = keyFields;
+ this.payloadFields = payloadFields;
+ this.fillFactor = fillFactor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+ IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new BTreeBulkLoadOperatorNodePushable(this, ctx, keyFields, payloadFields, fillFactor, recordDescProvider);
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..58b3265
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeBulkLoadOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+
+ private final int[] keyFields;
+ private final int[] payloadFields;
+
+ private float fillFactor;
+
+ private FrameTupleAccessor accessor;
+ private BTree.BulkLoadContext bulkLoadCtx;
+
+ private IRecordDescriptorProvider recordDescProvider;
+
+ public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] keyFields, int[] payloadFields, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+ super(opDesc, ctx);
+ this.keyFields = keyFields;
+ this.payloadFields = payloadFields;
+ this.fillFactor = fillFactor;
+ this.recordDescProvider = recordDescProvider;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ btree.endBulkLoad(bulkLoadCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+
+ // build record for insertion into btree
+ int tupleCount = accessor.getTupleCount();
+ for(int i = 0; i < tupleCount; i++) {
+ // determine size of record
+ int btreeRecordSize = 0;
+ for(int j = 0; j < keyFields.length; j++) {
+ btreeRecordSize += accessor.getFieldLength(i, keyFields[j]);
+ }
+ for(int j = 0; j < payloadFields.length; j++) {
+ btreeRecordSize += accessor.getFieldLength(i, payloadFields[j]);
+ }
+
+ MultiComparator cmp = opDesc.getMultiComparator();
+
+ // allocate record and copy fields
+ byte[] btreeRecord = new byte[btreeRecordSize];
+ int recRunner = 0;
+ for(int j = 0; j < keyFields.length; j++) {
+ int fieldStartOff = accessor.getTupleStartOffset(i) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, keyFields[j]);
+ int fieldLength = accessor.getFieldLength(i, keyFields[j]);
+
+ String rec = cmp.printKey(buffer.array(), fieldStartOff);
+ System.out.println("REC: " + rec);
+
+ System.arraycopy(buffer.array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
+ recRunner += fieldLength;
+ }
+ for(int j = 0; j < payloadFields.length; j++) {
+ int fieldStartOff = accessor.getTupleStartOffset(i) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, payloadFields[j]);
+ int fieldLength = accessor.getFieldLength(i, payloadFields[j]);
+ System.arraycopy(buffer.array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
+ recRunner += fieldLength;
+ }
+
+ // append to btree
+ try {
+ btree.bulkLoadAddRecord(bulkLoadCtx, btreeRecord);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ accessor = new FrameTupleAccessor(ctx, recDesc);
+ IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+ try {
+ init();
+ btree.open(opDesc.getBtreeFileId());
+ bulkLoadCtx = btree.beginBulkLoad(fillFactor, leafFrame, interiorFrame, metaFrame);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
new file mode 100644
index 0000000..6172f60
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeDiskOrderScanOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public BTreeDiskOrderScanOperatorDescriptor(JobSpecification spec,
+ IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc,
+ IBufferCacheProvider bufferCacheProvider,
+ IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId,
+ String btreeFileName, IBTreeInteriorFrameFactory interiorFactory,
+ IBTreeLeafFrameFactory leafFactory, MultiComparator cmp) {
+ super(spec, 0, 1, fileSplitProvider, recDesc, bufferCacheProvider,
+ btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory,
+ leafFactory, cmp, null);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+ IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new BTreeDiskOrderScanOperatorNodePushable(this, ctx);
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
new file mode 100644
index 0000000..b54952a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.DiskOrderScanCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeDiskOrderScanOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+
+ public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+ super(opDesc, ctx);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+ DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
+ IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+
+ try {
+ init();
+ fill();
+ btree.diskOrderScan(cursor, cursorFrame, metaFrame);
+ } catch (FileNotFoundException e1) {
+ e1.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ MultiComparator cmp = opDesc.getMultiComparator();
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ while(cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ int recRunner = cursor.getOffset();
+ byte[] array = cursor.getPage().getBuffer().array();
+ for(int i = 0; i < cmp.getFields().length; i++) {
+ int fieldLen = cmp.getFields()[i].getLength(array, recRunner);
+ dos.write(array, recRunner, fieldLen);
+ recRunner += fieldLen;
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ //int recOffset = cursor.getOffset();
+ //String rec = cmp.printRecord(array, recOffset);
+ //System.out.println(rec);
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ writer.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java
new file mode 100644
index 0000000..24b133b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class BTreeRegistry {
+
+ private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+ private Lock registryLock = new ReentrantLock();
+
+ public BTree get(int fileId) {
+ return map.get(fileId);
+ }
+
+ // TODO: not very high concurrency, but good enough for now
+ public void lock() {
+ registryLock.lock();
+ }
+
+ public void unlock() {
+ registryLock.unlock();
+ }
+
+ public void register(int fileId, BTree btree) {
+ map.put(fileId, btree);
+ }
+
+ public void unregister(int fileId) {
+ map.remove(fileId);
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java
new file mode 100644
index 0000000..832f8e9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+public class BTreeRegistryProvider implements IBTreeRegistryProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private static BTreeRegistry btreeRegistry = null;
+
+ @Override
+ public BTreeRegistry getBTreeRegistry() {
+ if(btreeRegistry == null) {
+ btreeRegistry = new BTreeRegistry();
+ }
+ return btreeRegistry;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
new file mode 100644
index 0000000..1e17b12
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutputStream;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.btree.types.Int32Accessor;
+
+public class BTreeSearchOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public BTreeSearchOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId, String btreeFileName, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, MultiComparator cmp, RangePredicate rangePred) {
+ super(spec, 0, 1, fileSplitProvider, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory, leafFactory, cmp, rangePred);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new BTreeSearchOperatorNodePushable(this, ctx);
+ }
+
+ public static void main(String args[]) throws HyracksDataException {
+
+ IBTreeInteriorFrameFactory interiorFrameFactory = new NSMInteriorFrameFactory();
+ IBTreeLeafFrameFactory leafFrameFactory = new NSMLeafFrameFactory();
+
+ IFieldAccessor[] fields = new IFieldAccessor[2];
+ fields[0] = new Int32Accessor(); // key field
+ fields[1] = new Int32Accessor(); // value field
+
+ int keyLen = 1;
+ IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+ cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator cmp = new MultiComparator(cmps, fields);
+
+ ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream lkdos = new DataOutputStream(lkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+ ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+ DataOutputStream hkdos = new DataOutputStream(hkbaaos);
+ IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+ byte[] lowKey = lkbaaos.toByteArray();
+ byte[] highKey = hkbaaos.toByteArray();
+
+ IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+ searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+ RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+
+ IBufferCacheProvider bufferCacheProvider = new BufferCacheProvider();
+ IBTreeRegistryProvider btreeRegistryProvider = new BTreeRegistryProvider();
+
+ RecordDescriptor recDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ JobSpecification spec = new JobSpecification();
+ BTreeSearchOperatorDescriptor opDesc = new BTreeSearchOperatorDescriptor(spec, null, recDesc, bufferCacheProvider, btreeRegistryProvider, 0, "/tmp/btreetest.bin", interiorFrameFactory, leafFrameFactory, cmp, rangePred);
+ IOperatorNodePushable op = opDesc.createPushRuntime(null, null, null, 0, 0);
+ op.open();
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
new file mode 100644
index 0000000..88d263e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -0,0 +1,91 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
+
+public class BTreeSearchOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+
+ public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+ super(opDesc, ctx);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+ IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
+
+ try {
+ init();
+ fill();
+ btree.search(cursor, opDesc.getRangePredicate(), leafFrame, interiorFrame);
+ } catch (FileNotFoundException e1) {
+ e1.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ MultiComparator cmp = opDesc.getMultiComparator();
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ while(cursor.hasNext()) {
+ tb.reset();
+ cursor.next();
+
+ int recRunner = cursor.getOffset();
+ byte[] array = cursor.getPage().getBuffer().array();
+ for(int i = 0; i < cmp.getFields().length; i++) {
+ int fieldLen = cmp.getFields()[i].getLength(array, recRunner);
+ dos.write(array, recRunner, fieldLen);
+ recRunner += fieldLen;
+ tb.addFieldEndOffset();
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ //int recOffset = cursor.getOffset();
+ //String rec = cmp.printRecord(array, recOffset);
+ //System.out.println(rec);
+ }
+
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ writer.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java
new file mode 100644
index 0000000..6881cf7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public class BufferCacheProvider implements IBufferCacheProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private static IBufferCache bufferCache = null;
+ private static FileManager fileManager = null;
+ private static final int PAGE_SIZE = 8192;
+ private static final int NUM_PAGES = 40;
+
+ @Override
+ public IBufferCache getBufferCache() {
+
+ if(bufferCache == null) {
+ if(fileManager == null) fileManager = new FileManager();
+ ICacheMemoryAllocator allocator = new BufferAllocator();
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+ bufferCache = new BufferCache(allocator, prs, fileManager, PAGE_SIZE, NUM_PAGES);
+ }
+
+ return bufferCache;
+ }
+
+ @Override
+ public FileManager getFileManager() {
+ if(fileManager == null) fileManager = new FileManager();
+ return fileManager;
+ }
+
+ public class BufferAllocator implements ICacheMemoryAllocator {
+ @Override
+ public ByteBuffer[] allocate(int pageSize, int numPages) {
+ ByteBuffer[] buffers = new ByteBuffer[numPages];
+ for (int i = 0; i < numPages; ++i) {
+ buffers[i] = ByteBuffer.allocate(pageSize);
+ }
+ return buffers;
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java
new file mode 100644
index 0000000..b25aec2
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+
+public interface IBTreeOperatorDescriptor {
+ public String getBTreeFileName();
+ public int getBTreeFileId();
+
+ public MultiComparator getMultiComparator();
+ public RangePredicate getRangePred();
+
+ public IBTreeInteriorFrameFactory getInteriorFactory();
+ public IBTreeLeafFrameFactory getLeafFactory();
+
+ public IBufferCacheProvider getBufferCacheProvider();
+ public IBTreeRegistryProvider getBTreeRegistryProvider();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java
new file mode 100644
index 0000000..ec9a007
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.Serializable;
+
+public interface IBTreeRegistryProvider extends Serializable {
+ public BTreeRegistry getBTreeRegistry();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java
new file mode 100644
index 0000000..6f0edc6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public interface IBufferCacheProvider extends Serializable {
+ public IBufferCache getBufferCache();
+ public FileManager getFileManager();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index f1353b1..268aa4a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -41,6 +41,8 @@
// maxPage
private final int rootPage = 1; // the root page never changes
+ private boolean created = false;
+
private final IBufferCache bufferCache;
private int fileId;
private final IBTreeInteriorFrameFactory interiorFrameFactory;
@@ -64,6 +66,7 @@
public int usefulCompression = 0;
public int uselessCompression = 0;
+
public String printStats() {
StringBuilder strBuilder = new StringBuilder();
@@ -89,39 +92,56 @@
}
public void create(int fileId, IBTreeLeafFrame leafFrame, IBTreeMetaDataFrame metaFrame) throws Exception {
- // initialize meta data page
- ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
- pins++;
-
- metaNode.acquireWriteLatch();
- writeLatchesAcquired++;
- try {
- metaFrame.setPage(metaNode);
- metaFrame.initBuffer((byte) -1);
- metaFrame.setMaxPage(rootPage);
- } finally {
- metaNode.releaseWriteLatch();
- writeLatchesReleased++;
- bufferCache.unpin(metaNode);
- unpins++;
- }
-
- // initialize root page
- ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), true);
- pins++;
-
- rootNode.acquireWriteLatch();
- writeLatchesAcquired++;
- try {
- leafFrame.setPage(rootNode);
- leafFrame.initBuffer((byte) 0);
- } finally {
- rootNode.releaseWriteLatch();
- writeLatchesReleased++;
- bufferCache.unpin(rootNode);
- unpins++;
- }
- currentLevel = 0;
+
+ if(created) return;
+
+ treeLatch.writeLock().lock();
+ try {
+
+ // check if another thread bet us to it
+ if(created) return;
+
+ // initialize meta data page
+ ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+ pins++;
+
+ System.out.println(metaNode.getBuffer().capacity());
+
+ metaNode.acquireWriteLatch();
+ writeLatchesAcquired++;
+ try {
+ metaFrame.setPage(metaNode);
+ metaFrame.initBuffer((byte) -1);
+ metaFrame.setMaxPage(rootPage);
+ } finally {
+ metaNode.releaseWriteLatch();
+ writeLatchesReleased++;
+ bufferCache.unpin(metaNode);
+ unpins++;
+ }
+
+ // initialize root page
+ ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), true);
+ pins++;
+
+ rootNode.acquireWriteLatch();
+ writeLatchesAcquired++;
+ try {
+ leafFrame.setPage(rootNode);
+ leafFrame.initBuffer((byte) 0);
+ } finally {
+ rootNode.releaseWriteLatch();
+ writeLatchesReleased++;
+ bufferCache.unpin(rootNode);
+ unpins++;
+ }
+ currentLevel = 0;
+
+ created = true;
+ }
+ finally {
+ treeLatch.writeLock().unlock();
+ }
}
public void open(int fileId) {
@@ -1237,7 +1257,7 @@
int spaceNeeded = record.length + ctx.slotSize;
if (leafFrontier.bytesInserted + spaceNeeded > ctx.leafMaxBytes) {
int splitKeySize = cmp.getKeySize(leafFrontier.lastRecord, 0);
- ctx.splitKey.initData(splitKeySize);
+ ctx.splitKey.initData(splitKeySize);
System.arraycopy(leafFrontier.lastRecord, 0, ctx.splitKey.getData(), 0, splitKeySize);
ctx.splitKey.setLeftPage(leafFrontier.pageId);
int prevPageId = leafFrontier.pageId;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
index fd8a397..243a7a2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
@@ -15,11 +15,15 @@
package edu.uci.ics.hyracks.storage.am.btree.impls;
+import java.io.Serializable;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
-public class MultiComparator {
-
+public class MultiComparator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
private IBinaryComparator[] cmps = null;
private IFieldAccessor[] fields = null;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
index ffdf9e5..68d7ad2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -18,6 +18,8 @@
import edu.uci.ics.hyracks.storage.am.btree.api.ISearchPredicate;
public class RangePredicate implements ISearchPredicate {
+
+ private static final long serialVersionUID = 1L;
protected boolean isForward = true;
protected byte[] lowKeys = null;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
index 6c353cf..804b2f5 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
@@ -20,7 +20,9 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
public class Int32Accessor implements IFieldAccessor {
-
+
+ private static final long serialVersionUID = 1L;
+
@Override
public int getLength(byte[] data, int offset) {
return 4;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
index 85e10f4..243c62c 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
@@ -25,21 +25,14 @@
import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
public class StringAccessor implements IFieldAccessor {
-
+
+ private static final long serialVersionUID = 1L;
+
@Override
public int getLength(byte[] data, int offset) {
return StringUtils.getUTFLen(data, offset) + 2;
}
-
- /*
- @Override
- public int getLength(byte[] data, int offset) {
- // TODO: this is very inefficient. We need a getInt() method that works
- ByteBuffer buf = ByteBuffer.wrap(data);
- return buf.getInt(offset) + 4; // assuming the first int indicates the length
- }
- */
-
+
@Override
public String print(byte[] data, int offset) {
ByteBuffer buf = ByteBuffer.wrap(data);