added BTreeFileEnlistment op to register existing files as BTrees in the system

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@182 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
new file mode 100644
index 0000000..a1e4fbc
--- /dev/null
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.util.UUID;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.BTreeRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.BufferCacheProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.FileMappingProviderProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeFileEnlistmentOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IBTreeRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.IFileMappingProviderProvider;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMLeafFrameFactory;
+
+// This example will enlist existing files as primary index
+
+public class PrimaryIndexEnlistFilesExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        public int port = 1099;
+        
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+        
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+        
+        @Option(name = "-btreename", usage = "B-Tree file name", required = true)
+        public String btreeName;               
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        UUID jobId = hcc.createJob(options.app, job);
+        hcc.start(jobId);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+    
+    private static JobSpecification createJob(Options options) {
+    	
+    	JobSpecification spec = new JobSpecification();
+
+    	String[] splitNCs = options.ncs.split(",");
+    	
+    	// schema of tuples in existing files (see PrimaryIndexBulkLoadExample)
+        RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {                                    
+                IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE
+                });
+        
+        // create factories and providers for B-Tree
+        IBTreeInteriorFrameFactory interiorFrameFactory = new NSMInteriorFrameFactory();
+        IBTreeLeafFrameFactory leafFrameFactory = new NSMLeafFrameFactory();        
+        IBufferCacheProvider bufferCacheProvider = BufferCacheProvider.INSTANCE;
+        IBTreeRegistryProvider btreeRegistryProvider = BTreeRegistryProvider.INSTANCE;
+        IFileMappingProviderProvider fileMappingProviderProvider = FileMappingProviderProvider.INSTANCE;
+        
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        
+        IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
+        BTreeFileEnlistmentOperatorDescriptor fileEnlistmentOp = new BTreeFileEnlistmentOperatorDescriptor(spec, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeSplitProvider, fileMappingProviderProvider, interiorFrameFactory, leafFrameFactory, recDesc.getFields().length, comparatorFactories);
+        PartitionConstraint fileEnlistmentConstraint = JobHelper.createPartitionConstraint(splitNCs);
+        fileEnlistmentOp.setPartitionConstraint(fileEnlistmentConstraint);                                            
+        
+        spec.addRoot(fileEnlistmentOp);
+        
+    	return spec;
+    }    
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 3c84796..6dfd76b 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -38,7 +38,7 @@
     
     public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
             int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, true);
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.CREATE_BTREE);
         this.fillFactor = fillFactor;
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 50b4e94..ebe1a9b 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -35,7 +35,7 @@
     private final BTreeOpHelper btreeOpHelper;
 
     public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
     }
 
     @Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java
new file mode 100644
index 0000000..035a530
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorDescriptor.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+
+// re-create in-memory state for a btree that has already been built (i.e., the file exists):
+// 1. register files in file manager (FileManager)
+// 2. create file mappings (FileMappingProvider)
+// 3. register btree instance (BTreeRegistry)
+
+public class BTreeFileEnlistmentOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+	private static final long serialVersionUID = 1L;
+	
+	public BTreeFileEnlistmentOperatorDescriptor(JobSpecification spec,
+			RecordDescriptor recDesc,
+			IBufferCacheProvider bufferCacheProvider,
+			IBTreeRegistryProvider btreeRegistryProvider,
+			IFileSplitProvider fileSplitProvider,
+			IFileMappingProviderProvider fileMappingProviderProvider,
+			IBTreeInteriorFrameFactory interiorFactory,
+			IBTreeLeafFrameFactory leafFactory, int fieldCount,
+			IBinaryComparatorFactory[] comparatorFactories) {
+		super(spec, 0, 0, recDesc, bufferCacheProvider,
+				btreeRegistryProvider, fileSplitProvider, fileMappingProviderProvider,
+				interiorFactory, leafFactory, fieldCount, comparatorFactories);		
+	}
+	
+	@Override
+	public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+			IOperatorEnvironment env,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int partitions) throws HyracksDataException {
+		return new BTreeFileEnlistmentOperatorNodePushable(this, ctx, partition);
+	}
+	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
new file mode 100644
index 0000000..cd0b4b3
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+
+public class BTreeFileEnlistmentOperatorNodePushable extends AbstractOperatorNodePushable {
+	
+	private final BTreeOpHelper btreeOpHelper;
+	
+	public BTreeFileEnlistmentOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition) {
+		btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.ENLIST_BTREE);
+	}
+
+	@Override
+	public void deinitialize() throws HyracksDataException {
+	}
+
+	@Override
+	public int getInputArity() {		
+		return 0;
+	}
+
+	@Override
+	public IFrameWriter getInputFrameWriter(int index) {
+		return null;
+	}
+
+	@Override
+	public void initialize() throws HyracksDataException {
+		btreeOpHelper.init();		
+	}
+
+	@Override
+	public void setOutputFrameWriter(int index, IFrameWriter writer,
+			RecordDescriptor recordDesc) {	
+	}	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index a272d56..0690dbf 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -47,7 +47,7 @@
     
     public BTreeInsertUpdateDeleteOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx,
     		int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, BTreeOp op) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
         this.recordDescProvider = recordDescProvider;
         this.op = op;
         tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
index 277938d..db4595a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -32,7 +32,14 @@
 import edu.uci.ics.hyracks.storage.common.file.FileManager;
 
 final class BTreeOpHelper {
-    private IBTreeInteriorFrame interiorFrame;
+    
+	public enum BTreeMode {
+		OPEN_BTREE,
+		CREATE_BTREE,
+		ENLIST_BTREE
+	}
+	
+	private IBTreeInteriorFrame interiorFrame;
     private IBTreeLeafFrame leafFrame;
 
     private BTree btree;
@@ -42,12 +49,12 @@
     private AbstractBTreeOperatorDescriptor opDesc;
     private IHyracksContext ctx;
 
-    private boolean createBTree;
+    private BTreeMode mode;
     
-    BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, int partition, boolean createBTree) {
+    BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx, int partition, BTreeMode mode) {
         this.opDesc = opDesc;
         this.ctx = ctx;
-        this.createBTree = createBTree;
+        this.mode = mode;
         this.partition = partition;
     }  
     
@@ -72,26 +79,43 @@
         
         String fileName = f.getAbsolutePath();
         Integer fileId = fileMappingProviderProvider.getFileMappingProvider().getFileId(fileName);        
-        if(fileId == null) {
-        	if(createBTree) {
-        		fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, createBTree);        		
-        	}
-        	else {
-        		throw new HyracksDataException("Cannot get id for file " + fileName + ". File name has not been mapped.");
-        	}
-        }
-        else {
-        	if(createBTree) {
-        		throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
-        	}        	     
-        }        
-        btreeFileId = fileId;  
         
-        if (!f.exists() && !createBTree) {
-            throw new HyracksDataException("Trying to open btree from file " + fileName + " but file doesn't exist.");
-        }
+        switch(mode) {
+    	
+    	case OPEN_BTREE: {
+    		if(fileId == null) {
+    			throw new HyracksDataException("Cannot get id for file " + fileName + ". File name has not been mapped.");
+    		}
+    		if(!f.exists()) {
+    			throw new HyracksDataException("Trying to open btree from file " + fileName + " but file doesn't exist.");
+    		}
+    	} break;
         
-        if(createBTree) {
+    	case CREATE_BTREE: {
+    		if(fileId == null) {
+    			fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, true);
+    		}
+    		else {
+    			throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
+    		}    		
+    	} break;
+        
+    	case ENLIST_BTREE: {
+    		if(fileId == null) {
+    			fileId = fileMappingProviderProvider.getFileMappingProvider().mapNameToFileId(fileName, true);
+    		}
+    		else {
+    			throw new HyracksDataException("Cannot map file " + fileName + " to an id. File name has already been mapped.");
+    		}    		
+    		if(!f.exists()) {
+    			throw new HyracksDataException("Trying to enlist btree from file " + fileName + " but file doesn't exist.");
+    		}
+    	} break;
+        }
+    	
+    	btreeFileId = fileId;  
+    	
+        if(mode == BTreeMode.CREATE_BTREE || mode == BTreeMode.ENLIST_BTREE) {
         	FileInfo fi = new FileInfo(btreeFileId, raf);
         	fileManager.registerFile(fi);
         }
@@ -119,7 +143,7 @@
                     MultiComparator cmp = new MultiComparator(opDesc.getFieldCount(), comparators);
                     
                     btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
-                    if (createBTree) {
+                    if (mode == BTreeMode.CREATE_BTREE) {
                         MetaDataFrame metaFrame = new MetaDataFrame();
                         try {
 							btree.create(btreeFileId, leafFrame, metaFrame);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 391659d..67e0c3b 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -59,7 +59,7 @@
     private RecordDescriptor recDesc;       
         
     public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields, int[] highKeyFields) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, false);
+        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
         this.isForward = isForward;        
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);        
         if(lowKeyFields != null && lowKeyFields.length > 0) {