Copied hyracks trunk into fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1958 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
new file mode 100644
index 0000000..42a11dd
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -0,0 +1,84 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+  <artifactId>btreeclient</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>btree-example</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-btree</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+  		<artifactId>btreehelper</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.btree.client.BTreeBulkLoadExample</mainClass>
+                  <name>btreebulkload</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
new file mode 100644
index 0000000..b6e8c72
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.DataGenOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+// This example will insert tuples into the primary and secondary index using an insert pipeline
+
+public class InsertPipelineExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+
+        @Option(name = "-num-tuples", usage = "Total number of tuples to to be generated for insertion", required = true)
+        public int numTuples;
+
+        @Option(name = "-primary-btreename", usage = "B-Tree file name of primary index", required = true)
+        public String primaryBTreeName;
+
+        @Option(name = "-secondary-btreename", usage = "B-Tree file name of secondary index", required = true)
+        public String secondaryBTreeName;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(options.app, job);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(Options options) {
+
+        JobSpecification spec = new JobSpecification();
+
+        String[] splitNCs = options.ncs.split(",");
+
+        // schema of tuples to be generated: 4 fields with int, string, string,
+        // string
+        // we will use field 2 as primary key to fill a clustered index
+        RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, // this field will
+                                                           // not go into B-Tree
+                UTF8StringSerializerDeserializer.INSTANCE, // we will use this
+                                                           // as payload
+                IntegerSerializerDeserializer.INSTANCE, // we will use this
+                                                        // field as key
+                IntegerSerializerDeserializer.INSTANCE, // we will use this as
+                                                        // payload
+                UTF8StringSerializerDeserializer.INSTANCE // we will use this as
+                                                          // payload
+                });
+
+        // generate numRecords records with field 2 being unique, integer values
+        // in [0, 100000], and strings with max length of 10 characters, and
+        // random seed 100
+        DataGenOperatorDescriptor dataGen = new DataGenOperatorDescriptor(spec, recDesc, options.numTuples, 2, 0,
+                100000, 10, 100);
+        // run data generator on first nodecontroller given
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dataGen, splitNCs[0]);
+
+        IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
+
+        // prepare insertion into primary index
+        // tuples to be put into B-Tree shall have 4 fields
+        int primaryFieldCount = 4;
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+
+        // comparator factories for primary index
+        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // the B-Tree expects its keyfields to be at the front of its input
+        // tuple
+        int[] primaryFieldPermutation = { 2, 1, 3, 4 }; // map field 2 of input
+                                                        // tuple to field 0 of
+                                                        // B-Tree tuple, etc.        
+        IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
+
+        IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+
+        // create operator descriptor
+        TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory, null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
+
+        // prepare insertion into secondary index
+        // tuples to be put into B-Tree shall have 2 fields
+        int secondaryFieldCount = 2;
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
+
+        // comparator factories for secondary index
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // the B-Tree expects its keyfields to be at the front of its input
+        // tuple
+        int[] secondaryFieldPermutation = { 1, 2 };
+        IFileSplitProvider secondarySplitProvider = JobHelper.createFileSplitProvider(splitNCs,
+                options.secondaryBTreeName);
+        // create operator descriptor
+        TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory, null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
+
+        // end the insert pipeline at this sink operator
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+        JobHelper.createPartitionConstraint(spec, nullSink, splitNCs);
+
+        // distribute the records from the datagen via hashing to the bulk load
+        // ops
+        IBinaryHashFunctionFactory[] hashFactories = new IBinaryHashFunctionFactory[1];
+        hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
+        IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
+
+        // connect the ops
+
+        spec.connect(hashConn, dataGen, 0, primaryInsert, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryInsert, 0, secondaryInsert, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryInsert, 0, nullSink, 0);
+
+        spec.addRoot(nullSink);
+
+        return spec;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java
new file mode 100644
index 0000000..63f07f0
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/JobHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class JobHelper {
+    public static IFileSplitProvider createFileSplitProvider(String[] splitNCs, String btreeFileName) {
+        FileSplit[] fileSplits = new FileSplit[splitNCs.length];
+        for (int i = 0; i < splitNCs.length; ++i) {
+            String fileName = btreeFileName + "." + splitNCs[i];
+            fileSplits[i] = new FileSplit(splitNCs[i], new FileReference(new File(fileName)));
+        }
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        return splitProvider;
+    }
+
+    public static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, String[] splitNCs) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, splitNCs);
+    }
+}
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
new file mode 100644
index 0000000..a6c7ea6
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.DataGenOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+// This example will load a primary index from randomly generated data
+
+public class PrimaryIndexBulkLoadExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+
+        @Option(name = "-num-tuples", usage = "Total number of tuples to to be generated for loading", required = true)
+        public int numTuples;
+
+        @Option(name = "-btreename", usage = "B-Tree file name", required = true)
+        public String btreeName;
+
+        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+        public int sbSize = 32768;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(options.app, job);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(Options options) {
+
+        JobSpecification spec = new JobSpecification();
+
+        String[] splitNCs = options.ncs.split(",");
+
+        // schema of tuples to be generated: 5 fields with string, string, int,
+        // int, string
+        // we will use field-index 2 as primary key to fill a clustered index
+        RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, // this field will
+                                                           // not go into B-Tree
+                UTF8StringSerializerDeserializer.INSTANCE, // we will use this
+                                                           // as payload
+                IntegerSerializerDeserializer.INSTANCE, // we will use this
+                                                        // field as key
+                IntegerSerializerDeserializer.INSTANCE, // we will use this as
+                                                        // payload
+                UTF8StringSerializerDeserializer.INSTANCE // we will use this as
+                                                          // payload
+                });
+
+        // generate numRecords records with field 2 being unique, integer values
+        // in [0, 100000], and strings with max length of 10 characters, and
+        // random seed 50
+        DataGenOperatorDescriptor dataGen = new DataGenOperatorDescriptor(spec, recDesc, options.numTuples, 2, 0,
+                100000, 10, 50);
+        // run data generator on first nodecontroller given
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dataGen, splitNCs[0]);
+
+        // sort the tuples as preparation for bulk load
+        // fields to sort on
+        int[] sortFields = { 2 };
+        // comparators for sort fields
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, options.sbSize, sortFields,
+                comparatorFactories, recDesc);
+        JobHelper.createPartitionConstraint(spec, sorter, splitNCs);
+
+        // tuples to be put into B-Tree shall have 4 fields
+        int fieldCount = 4;
+        ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
+        typeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        typeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+
+        // create providers for B-Tree
+        IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
+
+        // the B-Tree expects its keyfields to be at the front of its input
+        // tuple
+        int[] fieldPermutation = { 2, 1, 3, 4 }; // map field 2 of input tuple
+                                                 // to field 0 of B-Tree tuple,
+                                                 // etc.
+        IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
+
+        // distribute the records from the datagen via hashing to the bulk load
+        // ops
+        IBinaryHashFunctionFactory[] hashFactories = new IBinaryHashFunctionFactory[1];
+        hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
+        IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
+
+        spec.connect(hashConn, dataGen, 0, sorter, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+
+        spec.addRoot(btreeBulkLoad);
+
+        return spec;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
new file mode 100644
index 0000000..d24ba33
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.io.DataOutput;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+// This example will perform an ordered scan on the primary index
+// i.e. a range-search for [-infinity, +infinity]
+
+public class PrimaryIndexSearchExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+
+        @Option(name = "-btreename", usage = "B-Tree file name to search", required = true)
+        public String btreeName;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(options.app, job);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(Options options) throws HyracksDataException {
+
+        JobSpecification spec = new JobSpecification();
+
+        String[] splitNCs = options.ncs.split(",");
+
+        int fieldCount = 4;
+        ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
+        typeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        typeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+
+        // comparators for btree
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // create roviders for B-Tree
+        IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
+
+        // schema of tuples coming out of primary index
+        RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, });
+
+        // build tuple containing low and high search keys
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(comparatorFactories.length * 2); // high
+                                                                                      // key
+                                                                                      // and
+                                                                                      // low
+                                                                                      // key
+        DataOutput dos = tb.getDataOutput();
+
+        tb.reset();
+        IntegerSerializerDeserializer.INSTANCE.serialize(100, dos); // low key
+        tb.addFieldEndOffset();
+        IntegerSerializerDeserializer.INSTANCE.serialize(200, dos); // build
+                                                                    // high key
+        tb.addFieldEndOffset();
+
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        JobHelper.createPartitionConstraint(spec, keyProviderOp, splitNCs);
+
+        int[] lowKeyFields = { 0 }; // low key is in field 0 of tuples going
+                                    // into search op
+        int[] highKeyFields = { 1 }; // low key is in field 1 of tuples going
+                                     // into search op
+
+        IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+        BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, storageManager,
+                indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, btreeSearchOp, splitNCs);
+
+        // have each node print the results of its respective B-Tree
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        JobHelper.createPartitionConstraint(spec, printer, splitNCs);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, btreeSearchOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), btreeSearchOp, 0, printer, 0);
+
+        spec.addRoot(printer);
+
+        return spec;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
new file mode 100644
index 0000000..5aa338a
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDiskOrderScanOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+// This example will load a secondary index with <key, primary-index key> pairs
+// We require an existing primary index built with PrimaryIndexBulkLoadExample
+
+public class SecondaryIndexBulkLoadExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+
+        @Option(name = "-primary-btreename", usage = "Name of primary-index B-Tree to load from", required = true)
+        public String primaryBTreeName;
+
+        @Option(name = "-secondary-btreename", usage = "B-Tree file name for secondary index to be built", required = true)
+        public String secondaryBTreeName;
+
+        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+        public int sbSize = 32768;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(options.app, job);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(Options options) {
+
+        JobSpecification spec = new JobSpecification();
+
+        String[] splitNCs = options.ncs.split(",");
+
+        IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
+
+        // schema of tuples that we are retrieving from the primary index
+        RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, // we will use this as
+                                                        // payload in secondary
+                                                        // index
+                UTF8StringSerializerDeserializer.INSTANCE, // we will use this
+                                                           // ask key in
+                                                           // secondary index
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        int primaryFieldCount = 4;
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+
+        // comparators for sort fields and BTree fields
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        comparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // use a disk-order scan to read primary index
+        IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+        TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new TreeIndexDiskOrderScanOperatorDescriptor(spec,
+                recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
+
+        // sort the tuples as preparation for bulk load into secondary index
+        // fields to sort on
+        int[] sortFields = { 1, 0 };
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, options.sbSize, sortFields,
+                comparatorFactories, recDesc);
+        JobHelper.createPartitionConstraint(spec, sorter, splitNCs);
+
+        // tuples to be put into B-Tree shall have 2 fields
+        int secondaryFieldCount = 2;
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
+
+        // the B-Tree expects its keyfields to be at the front of its input
+        // tuple
+        int[] fieldPermutation = { 1, 0 };
+        IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
+
+        // connect the ops
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), btreeScanOp, 0, sorter, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+
+        spec.addRoot(btreeBulkLoad);
+
+        return spec;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
new file mode 100644
index 0000000..277668b
--- /dev/null
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.btree.client;
+
+import java.io.DataOutput;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
+import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+// This example will perform range search on the secondary index
+// and then retrieve the corresponding source records from the primary index
+
+public class SecondaryIndexSearchExample {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-target-ncs", usage = "Comma separated list of node-controller names to use", required = true)
+        public String ncs;
+
+        @Option(name = "-primary-btreename", usage = "Primary B-Tree file name", required = true)
+        public String primaryBTreeName;
+
+        @Option(name = "-secondary-btreename", usage = "Secondary B-Tree file name to search", required = true)
+        public String secondaryBTreeName;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(options);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(options.app, job);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(Options options) throws HyracksDataException {
+
+        JobSpecification spec = new JobSpecification();
+
+        String[] splitNCs = options.ncs.split(",");
+
+        IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
+
+        // schema of tuples coming out of secondary index
+        RecordDescriptor secondaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        int secondaryFieldCount = 2;
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
+
+        // comparators for sort fields and BTree fields
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // comparators for primary index
+        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
+        primaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // schema of tuples coming out of primary index
+        RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, });
+
+        int primaryFieldCount = 4;
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+
+        // comparators for btree, note that we only need a comparator for the
+        // non-unique key
+        // i.e. we will have a range condition on the first field only (implying
+        // [-infinity, +infinity] for the second field)
+        IBinaryComparatorFactory[] searchComparatorFactories = new IBinaryComparatorFactory[1];
+        searchComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+
+        // build tuple containing low and high search keys
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(searchComparatorFactories.length * 2); // low
+        // and
+        // high
+        // key
+        DataOutput dos = tb.getDataOutput();
+
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos); // low
+                                                                       // key
+        tb.addFieldEndOffset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("f", dos); // high
+                                                                       // key
+        tb.addFieldEndOffset();
+
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        JobHelper.createPartitionConstraint(spec, keyProviderOp, splitNCs);
+
+        int[] secondaryLowKeyFields = { 0 }; // low key is in field 0 of tuples
+                                             // going into secondary index
+                                             // search op
+        int[] secondaryHighKeyFields = { 1 }; // high key is in field 1 of
+                                              // tuples going into secondary
+                                              // index search op
+
+        IFileSplitProvider secondarySplitProvider = JobHelper.createFileSplitProvider(splitNCs,
+                options.secondaryBTreeName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+        BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                searchComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, secondarySearchOp, splitNCs);
+
+        // secondary index will output tuples with [UTF8String, Integer]
+        // the Integer field refers to the key in the primary index of the
+        // source data records
+        int[] primaryLowKeyFields = { 1 }; // low key is in field 0 of tuples
+                                           // going into primary index search op
+        int[] primaryHighKeyFields = { 1 }; // high key is in field 1 of tuples
+                                            // going into primary index search
+                                            // op
+
+        IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
+        JobHelper.createPartitionConstraint(spec, primarySearchOp, splitNCs);
+
+        // have each node print the results of its respective B-Tree
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        JobHelper.createPartitionConstraint(spec, printer, splitNCs);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, primarySearchOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, printer, 0);
+
+        spec.addRoot(printer);
+
+        return spec;
+    }
+}
\ No newline at end of file