added BTreeFileEnlistment op to register existing files as BTrees in the system
git-svn-id: https://hyracks.googlecode.com/svn/trunk@182 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
new file mode 100644
index 0000000..a1e4fbc
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.util.UUID;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.FileMappingProviderProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeFileEnlistmentOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IBTreeRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IFileMappingProviderProvider;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMLeafFrameFactory;
+
+// This example will enlist existing files as primary index
+
+public class PrimaryIndexEnlistFilesExample {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+ public int port = 1099;
+
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
+
+ @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+ public String ncs;
+
+ @Option(name = "-btreename", usage = "B-Tree file name", required = true)
+ public String btreeName;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+ JobSpecification job = createJob(options);
+
+ long start = System.currentTimeMillis();
+ UUID jobId = hcc.createJob(options.app, job);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+
+ private static JobSpecification createJob(Options options) {
+
+ JobSpecification spec = new JobSpecification();
+
+ String[] splitNCs = options.ncs.split(",");
+
+ // schema of tuples in existing files (see PrimaryIndexBulkLoadExample)
+ RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE
+ });
+
+ // create factories and providers for B-Tree
+ IBTreeInteriorFrameFactory interiorFrameFactory = new NSMInteriorFrameFactory();
+ IBTreeLeafFrameFactory leafFrameFactory = new NSMLeafFrameFactory();
+ IBufferCacheProvider bufferCacheProvider = BufferCacheProvider.INSTANCE;
+ IBTreeRegistryProvider btreeRegistryProvider = BTreeRegistryProvider.INSTANCE;
+ IFileMappingProviderProvider fileMappingProviderProvider = FileMappingProviderProvider.INSTANCE;
+
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+
+ IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
+ BTreeFileEnlistmentOperatorDescriptor fileEnlistmentOp = new BTreeFileEnlistmentOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length, comparatorFactories);
+ PartitionConstraint fileEnlistmentConstraint = JobHelper.createPartitionConstraint(splitNCs);
+ fileEnlistmentOp.setPartitionConstraint(fileEnlistmentConstraint);
+
+ spec.addRoot(fileEnlistmentOp);
+
+ return spec;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 3c84796..6dfd76b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -38,7 +38,7 @@
public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, true);
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.CREATE_BTREE);
this.fillFactor = fillFactor;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 50b4e94..ebe1a9b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -35,7 +35,7 @@
private final BTreeOpHelper btreeOpHelper;
public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
}
@Override
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java
new file mode 100644
index 0000000..035a530
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+
+// re-create in-memory state for a btree that has already been built (i.e., the file exists):
+// 1. register files in file manager (FileManager)
+// 2. create file mappings (FileMappingProvider)
+// 3. register btree instance (BTreeRegistry)
+
+public class BTreeFileEnlistmentOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public BTreeFileEnlistmentOperatorDescriptor(JobSpecification spec,
+ RecordDescriptor recDesc,
+ IBufferCacheProvider bufferCacheProvider,
+ IBTreeRegistryProvider btreeRegistryProvider,
+ IFileSplitProvider fileSplitProvider,
+ IFileMappingProviderProvider fileMappingProviderProvider,
+ IBTreeInteriorFrameFactory interiorFactory,
+ IBTreeLeafFrameFactory leafFactory, int fieldCount,
+ IBinaryComparatorFactory[] comparatorFactories) {
+ super(spec, 0, 0, recDesc, bufferCacheProvider,
+ btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider,
+ interiorFactory, leafFactory, fieldCount, comparatorFactories);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+ IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int partitions) throws HyracksDataException {
+ return new BTreeFileEnlistmentOperatorNodePushable(this, ctx, partition);
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
new file mode 100644
index 0000000..cd0b4b3
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+
+public class BTreeFileEnlistmentOperatorNodePushable extends AbstractOperatorNodePushable {
+
+ private final BTreeOpHelper btreeOpHelper;
+
+ public BTreeFileEnlistmentOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition) {
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.ENLIST_BTREE);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ btreeOpHelper.init();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer,
+ RecordDescriptor recordDesc) {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index a272d56..0690dbf 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -47,7 +47,7 @@
public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
this.recordDescProvider = recordDescProvider;
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
index 277938d..db4595a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -32,7 +32,14 @@
import edu.uci.ics.hyracks.storage.common.file.FileManager;
final class BTreeOpHelper {
- private IBTreeInteriorFrame interiorFrame;
+
+ public enum BTreeMode {
+ OPEN_BTREE,
+ CREATE_BTREE,
+ ENLIST_BTREE
+ }
+
+ private IBTreeInteriorFrame interiorFrame;
private IBTreeLeafFrame leafFrame;
private BTree btree;
@@ -42,12 +49,12 @@
private AbstractBTreeOperatorDescriptor opDesc;
private IHyracksContext ctx;
- private boolean createBTree;
+ private BTreeMode mode;
- BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, int partition, boolean createBTree) {
+ BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, int partition, BTreeMode mode) {
this.opDesc = opDesc;
this.ctx = ctx;
- this.createBTree = createBTree;
+ this.mode = mode;
this.partition = partition;
}
@@ -72,26 +79,43 @@
String fileName = f.getAbsolutePath();
Integer fileId = fileMappingProviderProvider.getFileMappingProvider().getFileId(fileName);
- if(fileId == null) {
- if(createBTree) {
- fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, createBTree);
- }
- else {
- throw new HyracksDataException("Cannot get id for file " + fileName + ". File name has not been mapped.");
- }
- }
- else {
- if(createBTree) {
- throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
- }
- }
- btreeFileId = fileId;
- if (!f.exists() && !createBTree) {
- throw new HyracksDataException("Trying to open btree from file " + fileName + " but file doesn't exist.");
- }
+ switch(mode) {
+
+ case OPEN_BTREE: {
+ if(fileId == null) {
+ throw new HyracksDataException("Cannot get id for file " + fileName + ". File name has not been mapped.");
+ }
+ if(!f.exists()) {
+ throw new HyracksDataException("Trying to open btree from file " + fileName + " but file doesn't exist.");
+ }
+ } break;
- if(createBTree) {
+ case CREATE_BTREE: {
+ if(fileId == null) {
+ fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, true);
+ }
+ else {
+ throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
+ }
+ } break;
+
+ case ENLIST_BTREE: {
+ if(fileId == null) {
+ fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, true);
+ }
+ else {
+ throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
+ }
+ if(!f.exists()) {
+ throw new HyracksDataException("Trying to enlist btree from file " + fileName + " but file doesn't exist.");
+ }
+ } break;
+ }
+
+ btreeFileId = fileId;
+
+ if(mode == BTreeMode.CREATE_BTREE || mode == BTreeMode.ENLIST_BTREE) {
FileInfo fi = new FileInfo(btreeFileId, raf);
fileManager.registerFile(fi);
}
@@ -119,7 +143,7 @@
MultiComparator cmp = new MultiComparator(opDesc.getFieldCount(), comparators);
btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
- if (createBTree) {
+ if (mode == BTreeMode.CREATE_BTREE) {
MetaDataFrame metaFrame = new MetaDataFrame();
try {
btree.create(btreeFileId, leafFrame, metaFrame);
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 391659d..67e0c3b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -59,7 +59,7 @@
private RecordDescriptor recDesc;
public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
- btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+ btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
this.isForward = isForward;
this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
if(lowKeyFields != null && lowKeyFields.length > 0) {