add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/src/main/assembly/binary-assembly.xml b/pregelix-core/src/main/assembly/binary-assembly.xml
new file mode 100755
index 0000000..0500499
--- /dev/null
+++ b/pregelix-core/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
new file mode 100644
index 0000000..0a169e0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+public interface IDriver {
+
+    public static enum Plan {
+        INNER_JOIN, OUTER_JOIN, OUTER_JOIN_SORT, OUTER_JOIN_SINGLE_SORT
+    }
+
+    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException;
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
new file mode 100644
index 0000000..2a91e0b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public interface IJobGen {
+
+    public JobSpecification generateCreatingJob() throws HyracksException;
+
+    public JobSpecification generateLoadingJob() throws HyracksException;
+
+    public JobSpecification generateJob(int iteration) throws HyracksException;
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..d839f2a
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public interface INormalizedKeyComputerFactoryProvider {
+
+    @SuppressWarnings("rawtypes")
+    INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass);
+
+    @SuppressWarnings("rawtypes")
+    INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass);
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
new file mode 100644
index 0000000..8cb2a50
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.pregelix.core.data;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+
+public class TypeTraits implements ITypeTraits {
+
+    private static final long serialVersionUID = 1L;
+    private final int length;
+    private final boolean isFixedLength;
+
+    public TypeTraits(boolean isFixedLength) {
+        this.isFixedLength = isFixedLength;
+        this.length = 0;
+    }
+
+    public TypeTraits(int length) {
+        this.isFixedLength = true;
+        this.length = length;
+    }
+
+    @Override
+    public boolean isFixedLength() {
+        return isFixedLength;
+    }
+
+    @Override
+    public int getFixedLength() {
+        return length;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
new file mode 100644
index 0000000..7076f7f
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -0,0 +1,220 @@
+package edu.uci.ics.pregelix.core.driver;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+@SuppressWarnings("rawtypes")
+public class Driver implements IDriver {
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private static String LIB = "lib";
+    private JobGen jobGen;
+    private PregelixJob job;
+    private boolean profiling;
+
+    private String applicationName;
+    private String GIRAPH_HOME = "GIRAPH_HOME";
+    private IHyracksClientConnection hcc;
+
+    private Class exampleClass;
+
+    public Driver(Class exampleClass) {
+        this.exampleClass = exampleClass;
+    }
+
+    @Override
+    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException {
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.job = job;
+        this.profiling = profiling;
+        try {
+            switch (planChoice) {
+                case INNER_JOIN:
+                    jobGen = new JobGenInnerJoin(job);
+                    break;
+                case OUTER_JOIN:
+                    jobGen = new JobGenOuterJoin(job);
+                    break;
+                case OUTER_JOIN_SORT:
+                    jobGen = new JobGenOuterJoinSort(job);
+                    break;
+                case OUTER_JOIN_SINGLE_SORT:
+                    jobGen = new JobGenOuterJoinSingleSort(job);
+                    break;
+                default:
+                    jobGen = new JobGenInnerJoin(job);
+            }
+
+            if (hcc == null)
+                hcc = new HyracksConnection(ipAddress, port);
+            ClusterConfig.loadClusterConfig(ipAddress, port);
+
+            URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+            URL[] urls = classLoader.getURLs();
+            String jarFile = "";
+            for (URL url : urls)
+                if (url.toString().endsWith(".jar"))
+                    jarFile = url.getPath();
+
+            installApplication(jarFile);
+            FileSystem dfs = FileSystem.get(job.getConfiguration());
+            dfs.delete(FileOutputFormat.getOutputPath(job), true);
+
+            runCreate(jobGen);
+            runDataLoad(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("data loading finished " + time + "ms");
+            int i = 1;
+            boolean terminate = false;
+            do {
+                start = System.currentTimeMillis();
+                runLoopBodyIteration(jobGen, i);
+                end = System.currentTimeMillis();
+                time = end - start;
+                LOG.info("iteration " + i + " finished " + time + "ms");
+                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+                i++;
+            } while (!terminate);
+
+            start = System.currentTimeMillis();
+            runHDFSWRite(jobGen);
+            runCleanup(jobGen);
+            destroyApplication(applicationName);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+            execute(treeCreateSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runDataLoad(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+            execute(bulkLoadJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+        try {
+            JobSpecification loopBody = jobGen.generateJob(iteration);
+            execute(loopBody);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runHDFSWRite(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+            execute(scanSortPrintJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runCleanup(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification[] cleanups = jobGen.generateCleanup();
+            runJobArray(cleanups);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runJobArray(JobSpecification[] jobs) throws Exception {
+        for (JobSpecification job : jobs) {
+            execute(job);
+        }
+    }
+
+    private void execute(JobSpecification job) throws Exception {
+        JobId jobId = hcc.startJob(applicationName, job,
+                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+
+    public void installApplication(String jarFile) throws Exception {
+        System.out.println(jarFile);
+        applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
+        String home = System.getProperty(GIRAPH_HOME);
+        if (home == null)
+            home = "./";
+        String libDir = home + LIB;
+        File dir = new File(libDir);
+        if (!dir.isDirectory()) {
+            throw new HyracksException(libDir + " is not a directory!");
+        }
+        System.out.println(dir.getAbsolutePath());
+        File[] libJars = dir.listFiles(new FileFilter("jar"));
+        Set<String> allJars = new TreeSet<String>();
+        allJars.add(jarFile);
+        for (File jar : libJars) {
+            allJars.add(jar.getAbsolutePath());
+        }
+        File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+        hcc.createApplication(applicationName, appZip);
+    }
+
+    public void destroyApplication(String jarFile) throws Exception {
+        hcc.destroyApplication(applicationName);
+    }
+
+}
+
+class FileFilter implements FilenameFilter {
+    private String ext;
+
+    public FileFilter(String ext) {
+        this.ext = "." + ext;
+    }
+
+    public boolean accept(File dir, String name) {
+        return name.endsWith(ext);
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
new file mode 100644
index 0000000..85f3e6b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+public class ConfigurationFactory implements IConfigurationFactory {
+    private static final long serialVersionUID = 1L;
+    private final byte[] data;
+
+    public ConfigurationFactory(Configuration conf) {
+        try {
+            data = SerDeUtils.serialize(conf);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public Configuration createConfiguration() throws HyracksDataException {
+        try {
+            Configuration conf = new Configuration();
+            SerDeUtils.deserialize(conf, data);
+            conf.setClassLoader(this.getClass().getClassLoader());
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
new file mode 100644
index 0000000..38f12fe
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class Message<I extends Writable, M extends Writable> implements Writable {
+    private I receiverId;
+    private M body;
+    private Configuration conf;
+
+    public Message() {
+    }
+
+    public Message(I receiverId, M body) {
+        this.receiverId = receiverId;
+        this.body = body;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        if (this.receiverId == null && this.body == null) {
+            setClass((Class<I>) BspUtils.getVertexIndexClass(getConf()),
+                    (Class<M>) BspUtils.getMessageValueClass(getConf()));
+        }
+        receiverId.readFields(input);
+        body.readFields(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        receiverId.write(output);
+        body.write(output);
+    }
+
+    public I getReceiverVertexId() {
+        return receiverId;
+    }
+
+    public M getMessageBody() {
+        return body;
+    }
+
+    private void setClass(Class<I> idClass, Class<M> bodyClass) {
+        receiverId = ReflectionUtils.newInstance(idClass, getConf());
+        body = ReflectionUtils.newInstance(bodyClass, getConf());
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
new file mode 100644
index 0000000..880e75c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+
+@SuppressWarnings("rawtypes")
+public class MessageList extends ArrayListWritable<Message> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void setClass() {
+        setClass(Message.class);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
new file mode 100644
index 0000000..9291e94
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IJobGen;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public abstract class JobGen implements IJobGen {
+    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+    protected static final int MB = 1048576;
+    protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    protected static final int frameSize = 65536;
+    protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
+    protected static final int tableSize = 10485767;
+    protected static final String PRIMARY_INDEX = "primary";
+    protected final Configuration conf;
+    protected final PregelixJob giraphJob;
+    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    protected static final String SECONDARY_INDEX_ODD = "secondary1";
+    protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+
+    public JobGen(PregelixJob job) {
+        this.conf = job.getConfiguration();
+        this.giraphJob = job;
+        this.initJobConfiguration();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void initJobConfiguration() {
+        Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+        List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+        Type vertexIndexType = parameterTypes.get(0);
+        Type vertexValueType = parameterTypes.get(1);
+        Type edgeValueType = parameterTypes.get(2);
+        Type messageValueType = parameterTypes.get(3);
+        conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+        conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+        conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+        conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateLoadingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner, splits);
+
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
+
+        /**
+         * construct tree bulk load operator
+         */
+        int[] fieldPermutation = new int[2];
+        fieldPermutation[0] = 0;
+        fieldPermutation[1] = 1;
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+        return spec;
+    }
+
+    @Override
+    public JobSpecification generateJob(int iteration) throws HyracksException {
+        if (iteration <= 0)
+            throw new IllegalStateException("iteration number cannot be less than 1");
+        if (iteration == 1)
+            return generateFirstIteration(iteration);
+        else
+            return generateNonFirstIteration(iteration);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                confFactory);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), sorter, 0, writer, 0);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        /**
+         * connect operator descriptors
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct write file operator
+         */
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+        ClusterConfig.setLocationConstraint(spec, writer);
+
+        /**
+         * connect operator descriptors
+         */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        return spec;
+    }
+
+    /***
+     * drop the sindex
+     * 
+     * @return JobSpecification
+     * @throws HyracksException
+     */
+    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistryProvider, fileSplitProvider);
+
+        ClusterConfig.setLocationConstraint(spec, drop);
+        spec.addRoot(drop);
+        return spec;
+    }
+
+    /** generate non-first iteration job */
+    protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+
+    /** generate first iteration job */
+    protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+    /** generate clean-up job */
+    public abstract JobSpecification[] generateCleanup() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
new file mode 100644
index 0000000..12e1cd7
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenInnerJoin extends JobGen {
+
+    public JobGenInnerJoin(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct drop index operator
+         */
+        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 3, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage,
+                rdDummy, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        /**
+         * construct bulk-load index operator
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+                new BTreeDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * construct unnest operator
+         */
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /**
+         * do pre- & post- super step
+         */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 2, btreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(emptySink);
+        spec.addRoot(btreeBulkLoad);
+        spec.addRoot(terminateWriter);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct the index-set-union operator
+         */
+        String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
+        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory(), true);
+        ClusterConfig.setLocationConstraint(spec, setUnion);
+
+        /**
+         * construct index-join-function-update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct bulk-load index operator
+         */
+        int fieldPermutation[] = new int[] { 0, 1 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, interiorFrameFactory,
+                leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+                new BTreeDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 2, btreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(emptySink);
+        spec.addRoot(btreeBulkLoad);
+        spec.addRoot(terminateWriter);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[3];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        cleanups[1] = this.dropIndex(SECONDARY_INDEX_ODD);
+        cleanups[2] = this.dropIndex(SECONDARY_INDEX_EVEN);
+        return cleanups;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
new file mode 100644
index 0000000..e1944eb
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoin extends JobGen {
+
+    public JobGenOuterJoin(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
new file mode 100644
index 0000000..80d11ab
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSingleSort extends JobGen {
+
+    public JobGenOuterJoinSingleSort(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct global sort operator
+         */
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct global sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+
+        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
new file mode 100644
index 0000000..f1f89b6
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -0,0 +1,375 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSort extends JobGen {
+
+    public JobGenOuterJoinSort(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global sort operator
+         */
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global sort operator
+         */
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
new file mode 100644
index 0000000..608980c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class JobGenUtil {
+
+    /**
+     * get normalized key factory for an iteration, to sort messages iteration
+     * 1: asc order iteration 2: desc order
+     * 
+     * @param iteration
+     * @param keyClass
+     * @return
+     */
+    public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(int iteration, Class keyClass) {
+        return NormalizedKeyComputerFactoryProvider.INSTANCE.getAscINormalizedKeyComputerFactory(keyClass);
+    }
+
+    /**
+     * get comparator for an iteration, to sort messages iteration 1: asc order
+     * iteration 0: desc order
+     * 
+     * @param iteration
+     * @param keyClass
+     * @return
+     */
+    public static IBinaryComparatorFactory getIBinaryComparatorFactory(int iteration, Class keyClass) {
+        return new WritableComparingBinaryComparatorFactory(keyClass);
+    }
+
+    /**
+     * get the B-tree scan order for an iteration iteration 1: desc order,
+     * backward scan iteration 2: asc order, forward scan
+     * 
+     * @param iteration
+     * @return
+     */
+    public static boolean getForwardScan(int iteration) {
+        return true;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
new file mode 100644
index 0000000..a22b6f4
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen.clusterconfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ClusterConfig {
+
+    private static String[] NCs;
+    private static String propertiesPath = "conf/stores.properties";
+    private static Map<String, List<String>> ipToNcMapping;
+    private static String[] stores;
+
+    /**
+     * let tests set config path to be whatever
+     * 
+     * @param confPath
+     * @param propertiesPath
+     */
+    public static void setStorePath(String propertiesPath) {
+        ClusterConfig.propertiesPath = propertiesPath;
+    }
+
+    /**
+     * get NC names running on one IP address
+     * 
+     * @param ipAddress
+     * @return
+     * @throws HyracksDataException
+     */
+    public static List<String> getNCNames(String ipAddress) throws HyracksException {
+        return ipToNcMapping.get(ipAddress);
+    }
+
+    /**
+     * get file split provider
+     * 
+     * @param jobId
+     * @return
+     * @throws HyracksDataException
+     */
+    public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
+        int i = 0;
+        for (String nc : NCs) {
+            for (String st : stores) {
+                FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
+                        + File.separator + indexName);
+                fileSplits[i++] = split;
+            }
+        }
+        return new ConstantFileSplitProvider(fileSplits);
+    }
+
+    private static void loadStores() throws HyracksException {
+        Properties properties = new Properties();
+        try {
+            properties.load(new FileInputStream(propertiesPath));
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        String store = properties.getProperty("store");
+        stores = store.split(";");
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
+            List<InputSplit> splits) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = splits.size();
+        String[] locations = new String[splits.size()];
+        Random random = new Random(System.currentTimeMillis());
+        for (int i = 0; i < splits.size(); i++) {
+            try {
+                String[] loc = splits.get(i).getLocations();
+                Collections.shuffle(Arrays.asList(loc), random);
+                if (loc.length > 0) {
+                    InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
+                    for (InetAddress ip : allIps) {
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+                            int pos = random.nextInt(ncs.size());
+                            locations[i] = ncs.get(pos);
+                        } else {
+                            int pos = random.nextInt(NCs.length);
+                            locations[i] = NCs[pos];
+                        }
+                    }
+                } else {
+                    int pos = random.nextInt(NCs.length);
+                    locations[i] = NCs[pos];
+                }
+            } catch (IOException e) {
+                throw new HyracksException(e);
+            } catch (InterruptedException e) {
+                throw new HyracksException(e);
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+            throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = 0;
+        String[] locations = new String[NCs.length * stores.length];
+        for (String nc : NCs) {
+            for (int i = 0; i < stores.length; i++) {
+                locations[count] = nc;
+                count++;
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = NCs.length * stores.length;
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+    }
+
+    public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
+        try {
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            NCs = new String[ncNameToNcInfos.size()];
+            ipToNcMapping = new HashMap<String, List<String>>();
+            int i = 0;
+            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+                        .getHostAddress();
+                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+                if (matchedNCs == null) {
+                    matchedNCs = new ArrayList<String>();
+                    ipToNcMapping.put(ipAddr, matchedNCs);
+                }
+                matchedNCs.add(entry.getKey());
+                NCs[i] = entry.getKey();
+                i++;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..1c331c0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.pregelix.core.jobgen.provider;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.core.base.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongAscNormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongDescNormalizedKeyComputerFactory;
+
+public class NormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
+
+    public static INormalizedKeyComputerFactoryProvider INSTANCE = new NormalizedKeyComputerFactoryProvider();
+
+    private NormalizedKeyComputerFactoryProvider() {
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass) {
+        if (keyClass.getName().indexOf("VLongWritable") > 0)
+            return new VLongAscNormalizedKeyComputerFactory();
+        else
+            return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass) {
+        if (keyClass.getName().indexOf("VLongWritable") > 0)
+            return new VLongDescNormalizedKeyComputerFactory();
+        else
+            return null;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
new file mode 100644
index 0000000..d1d927d
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private String[] fieldClasses;
+
+    public WritableRecordDescriptorFactory(String... fieldClasses) {
+        this.fieldClasses = fieldClasses;
+    }
+
+    @Override
+    public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+        try {
+            return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
new file mode 100644
index 0000000..3f15197
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+public class BufferSerDeUtils {
+
+    public static double getDouble(byte[] bytes, int offset) {
+        return Double.longBitsToDouble(getLong(bytes, offset));
+    }
+
+    public static float getFloat(byte[] bytes, int offset) {
+        return Float.intBitsToFloat(getInt(bytes, offset));
+    }
+
+    public static boolean getBoolean(byte[] bytes, int offset) {
+        if (bytes[offset] == 0)
+            return false;
+        else
+            return true;
+    }
+
+    public static int getInt(byte[] bytes, int offset) {
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+                + ((bytes[offset + 3] & 0xff) << 0);
+    }
+
+    public static long getLong(byte[] bytes, int offset) {
+        return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+                + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+                + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+    }
+
+    public static void writeBoolean(boolean value, byte[] bytes, int offset) {
+        if (value)
+            bytes[offset] = (byte) 1;
+        else
+            bytes[offset] = (byte) 0;
+    }
+
+    public static void writeInt(int value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeLong(long value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 56);
+        bytes[offset++] = (byte) (value >> 48);
+        bytes[offset++] = (byte) (value >> 40);
+        bytes[offset++] = (byte) (value >> 32);
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeDouble(double value, byte[] bytes, int offset) {
+        long lValue = Double.doubleToLongBits(value);
+        writeLong(lValue, bytes, offset);
+    }
+
+    public static void writeFloat(float value, byte[] bytes, int offset) {
+        int iValue = Float.floatToIntBits(value);
+        writeInt(iValue, bytes, offset);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
new file mode 100644
index 0000000..d0cff48
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class DataBalancer {
+
+    public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+        public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+                throws IOException {
+            output.collect(id, inputValue);
+        }
+    }
+
+    public static class ReduceRecordOnly extends MapReduceBase implements
+            Reducer<LongWritable, Text, NullWritable, Text> {
+
+        NullWritable key = NullWritable.get();
+
+        public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext())
+                output.collect(key, inputValue.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        JobConf job = new JobConf(DataBalancer.class);
+
+        job.setJobName(DataBalancer.class.getSimpleName());
+        job.setMapperClass(MapRecordOnly.class);
+        job.setReducerClass(ReduceRecordOnly.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[2]));
+
+        if (args.length > 3) {
+            if (args[3].startsWith("bzip"))
+                FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+            if (args[3].startsWith("gz"))
+                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
+        JobClient.runJob(job);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
new file mode 100644
index 0000000..aa63edf
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
@@ -0,0 +1,213 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * generate graph data from a base dataset
+ * 
+ */
+@SuppressWarnings("deprecation")
+public class DataGenerator {
+
+    public static class MapMaxId extends MapReduceBase implements
+            Mapper<LongWritable, Text, NullWritable, VLongWritable> {
+        private NullWritable key = NullWritable.get();
+        private VLongWritable value = new VLongWritable();
+
+        @Override
+        public void map(LongWritable id, Text inputValue, OutputCollector<NullWritable, VLongWritable> output,
+                Reporter reporter) throws IOException {
+            String[] vertices = inputValue.toString().split(" ");
+            long max = Long.parseLong(vertices[0]);
+            for (int i = 1; i < vertices.length; i++) {
+                long vid = Long.parseLong(vertices[i]);
+                if (vid > max)
+                    max = vid;
+            }
+            value.set(max);
+            output.collect(key, value);
+        }
+    }
+
+    public static class ReduceMaxId extends MapReduceBase implements
+            Reducer<NullWritable, VLongWritable, NullWritable, Text> {
+
+        private NullWritable key = NullWritable.get();
+        private long max = Long.MIN_VALUE;
+        private OutputCollector<NullWritable, Text> output;
+
+        @Override
+        public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext()) {
+                long vid = inputValue.next().get();
+                if (vid > max)
+                    max = vid;
+            }
+            if (this.output == null)
+                this.output = output;
+
+        }
+
+        @Override
+        public void close() throws IOException {
+            output.collect(key, new Text(new VLongWritable(max).toString()));
+        }
+    }
+
+    public static class CombineMaxId extends MapReduceBase implements
+            Reducer<NullWritable, VLongWritable, NullWritable, VLongWritable> {
+
+        private NullWritable key = NullWritable.get();
+        private long max = Long.MIN_VALUE;
+        private OutputCollector<NullWritable, VLongWritable> output;
+
+        @Override
+        public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+                OutputCollector<NullWritable, VLongWritable> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext()) {
+                long vid = inputValue.next().get();
+                if (vid > max)
+                    max = vid;
+            }
+            if (this.output == null)
+                this.output = output;
+        }
+
+        public void close() throws IOException {
+            output.collect(key, new VLongWritable(max));
+        }
+    }
+
+    public static class MapRecordGen extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+        private long maxId = 0;
+        private Text text = new Text();
+        private int x = 2;
+
+        @Override
+        public void configure(JobConf conf) {
+            try {
+                x = conf.getInt("hyracks.x", 2);
+                String fileName = conf.get("hyracks.maxid.file");
+                FileSystem dfs = FileSystem.get(conf);
+                dfs.delete(new Path(fileName + "/_SUCCESS"), true);
+                dfs.delete(new Path(fileName + "/_logs"), true);
+                FileStatus[] files = dfs.listStatus(new Path(fileName));
+
+                for (int i = 0; i < files.length; i++) {
+                    if (!files[i].isDir()) {
+                        DataInputStream input = dfs.open(files[i].getPath());
+                        String id = input.readLine();
+                        maxId = Long.parseLong(id) + 1;
+                        input.close();
+                    }
+                }
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+                throws IOException {
+            String[] vertices = inputValue.toString().split(" ");
+
+            /**
+             * generate data x times
+             */
+            for (int k = 0; k < x; k++) {
+                long max = maxId * k;
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < vertices.length - 1; i++) {
+                    long vid = Long.parseLong(vertices[i]) + max;
+                    sb.append(vid);
+                    sb.append(" ");
+                }
+                long vid = Long.parseLong(vertices[vertices.length - 1]) + max;
+                sb.append(vid);
+                text.set(sb.toString().getBytes());
+                output.collect(id, text);
+            }
+        }
+    }
+
+    public static class ReduceRecordGen extends MapReduceBase implements
+            Reducer<LongWritable, Text, NullWritable, Text> {
+
+        private NullWritable key = NullWritable.get();
+
+        public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext())
+                output.collect(key, inputValue.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        JobConf job = new JobConf(DataGenerator.class);
+        FileSystem dfs = FileSystem.get(job);
+        String maxFile = "/maxtemp";
+        dfs.delete(new Path(maxFile), true);
+
+        job.setJobName(DataGenerator.class.getSimpleName() + "max ID");
+        job.setMapperClass(MapMaxId.class);
+        job.setCombinerClass(CombineMaxId.class);
+        job.setReducerClass(ReduceMaxId.class);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(VLongWritable.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(maxFile));
+        job.setNumReduceTasks(1);
+        JobClient.runJob(job);
+
+        job = new JobConf(DataGenerator.class);
+        job.set("hyracks.maxid.file", maxFile);
+        job.setInt("hyracks.x", Integer.parseInt(args[2]));
+        dfs.delete(new Path(args[1]), true);
+
+        job.setJobName(DataGenerator.class.getSimpleName());
+        job.setMapperClass(MapRecordGen.class);
+        job.setReducerClass(ReduceRecordGen.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[3]));
+
+        if (args.length > 4) {
+            if (args[4].startsWith("bzip"))
+                FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+            if (args[4].startsWith("gz"))
+                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
+        JobClient.runJob(job);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
new file mode 100644
index 0000000..f17802b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AggregationFunctionFactory;
+
+public class DataflowUtils {
+
+    public enum AggregationMode {
+        PARTIAL, FINAL
+    }
+
+    @SuppressWarnings("unchecked")
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
+            throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        try {
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                    (Class<? extends Writable>) Class.forName(className1),
+                    (Class<? extends Writable>) Class.forName(className2));
+        } catch (ClassNotFoundException cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        return recordDescriptor;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+        try {
+            int i = 0;
+            for (String className : classNames)
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
+                        .forName(className));
+        } catch (ClassNotFoundException cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        recordDescriptor = new RecordDescriptor(serdes);
+        return recordDescriptor;
+    }
+
+    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
+            throws HyracksException {
+        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+        return rdFactory;
+    }
+
+    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal) {
+        IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
+                isFinal);
+        IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+                new IAggregateFunctionFactory[] { aggFuncFactory }, new int[] { 0 }, new int[] {});
+        return aggregatorFactory;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..ed04746
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class PregelixHyracksIntegrationUtil {
+
+    public static final String NC1_ID = "nc1";
+    public static final String NC2_ID = "nc2";
+
+    public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+    public static final String CC_HOST = "localhost";
+
+    public static final int FRAME_SIZE = 65536;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = CC_HOST;
+        ccConfig.clusterNetIpAddress = CC_HOST;
+        ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
+        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
+        ccConfig.defaultMaxJobAttempts = 0;
+        ccConfig.jobHistorySize = 10;
+
+        // cluster controller
+        cc = new ClusterControllerService(ccConfig);
+        cc.start();
+
+        // two node controllers
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.clusterNetIPAddress = "localhost";
+        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        nc1 = new NodeControllerService(ncConfig1);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.clusterNetIPAddress = "localhost";
+        ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        nc2 = new NodeControllerService(ncConfig2);
+        nc2.start();
+
+        // hyracks connection
+        hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+        ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+    }
+
+    public static void destroyApp(String hyracksAppName) throws Exception {
+        hcc.destroyApplication(hyracksAppName);
+    }
+
+    public static void createApp(String hyracksAppName) throws Exception {
+        hcc.createApplication(hyracksAppName, null);
+    }
+
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+
+    public static void runJob(JobSpecification spec, String appName) throws Exception {
+        spec.setFrameSize(FRAME_SIZE);
+        JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        hcc.waitForCompletion(jobId);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
new file mode 100644
index 0000000..1fdfde6
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Names a file or directory in a {@link FileSystem}. Path strings use slash as
+ * the directory separator. A path string is absolute if it begins with a slash.
+ */
+@SuppressWarnings("rawtypes")
+public class Path implements Comparable, Serializable {
+    private static final long serialVersionUID = 1L;
+    /** The directory separator, a slash. */
+    public static final String SEPARATOR = "/";
+    public static final char SEPARATOR_CHAR = '/';
+
+    public static final String CUR_DIR = ".";
+
+    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+    private URI uri; // a hierarchical uri
+
+    /** Resolve a child path against a parent path. */
+    public Path(String parent, String child) {
+        this(new Path(parent), new Path(child));
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(Path parent, String child) {
+        this(parent, new Path(child));
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(String parent, Path child) {
+        this(new Path(parent), child);
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(Path parent, Path child) {
+        // Add a slash to parent's path so resolution is compatible with URI's
+        URI parentUri = parent.uri;
+        String parentPath = parentUri.getPath();
+        if (!(parentPath.equals("/") || parentPath.equals("")))
+            try {
+                parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
+                        parentUri.getFragment());
+            } catch (URISyntaxException e) {
+                throw new IllegalArgumentException(e);
+            }
+        URI resolved = parentUri.resolve(child.uri);
+        initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
+                resolved.getFragment());
+    }
+
+    private void checkPathArg(String path) {
+        // disallow construction of a Path from an empty string
+        if (path == null) {
+            throw new IllegalArgumentException("Can not create a Path from a null string");
+        }
+        if (path.length() == 0) {
+            throw new IllegalArgumentException("Can not create a Path from an empty string");
+        }
+    }
+
+    /**
+     * Construct a path from a String. Path strings are URIs, but with unescaped
+     * elements and some additional normalization.
+     */
+    public Path(String pathString) {
+        checkPathArg(pathString);
+
+        // We can't use 'new URI(String)' directly, since it assumes things are
+        // escaped, which we don't require of Paths.
+
+        // add a slash in front of paths with Windows drive letters
+        if (hasWindowsDrive(pathString, false))
+            pathString = "/" + pathString;
+
+        // parse uri components
+        String scheme = null;
+        String authority = null;
+
+        int start = 0;
+
+        // parse uri scheme, if any
+        int colon = pathString.indexOf(':');
+        int slash = pathString.indexOf('/');
+        if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
+                                                                   // scheme
+            scheme = pathString.substring(0, colon);
+            start = colon + 1;
+        }
+
+        // parse uri authority, if any
+        if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
+                                                                                       // authority
+            int nextSlash = pathString.indexOf('/', start + 2);
+            int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
+            authority = pathString.substring(start + 2, authEnd);
+            start = authEnd;
+        }
+
+        // uri path is the rest of the string -- query & fragment not supported
+        String path = pathString.substring(start, pathString.length());
+
+        initialize(scheme, authority, path, null);
+    }
+
+    /** Construct a Path from components. */
+    public Path(String scheme, String authority, String path) {
+        checkPathArg(path);
+        initialize(scheme, authority, path, null);
+    }
+
+    /**
+     * Construct a path from a URI
+     */
+    public Path(URI aUri) {
+        uri = aUri;
+    }
+
+    private void initialize(String scheme, String authority, String path, String fragment) {
+        try {
+            this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    private String normalizePath(String path) {
+        // remove double slashes & backslashes
+        if (path.indexOf("//") != -1) {
+            path = path.replace("//", "/");
+        }
+        if (path.indexOf("\\") != -1) {
+            path = path.replace("\\", "/");
+        }
+
+        // trim trailing slash from non-root path (ignoring windows drive)
+        int minLength = hasWindowsDrive(path, true) ? 4 : 1;
+        if (path.length() > minLength && path.endsWith("/")) {
+            path = path.substring(0, path.length() - 1);
+        }
+
+        return path;
+    }
+
+    private boolean hasWindowsDrive(String path, boolean slashed) {
+        if (!WINDOWS)
+            return false;
+        int start = slashed ? 1 : 0;
+        return path.length() >= start + 2
+                && (slashed ? path.charAt(0) == '/' : true)
+                && path.charAt(start + 1) == ':'
+                && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
+                        .charAt(start) <= 'z'));
+    }
+
+    /** Convert this to a URI. */
+    public URI toUri() {
+        return uri;
+    }
+
+    /** Return the FileSystem that owns this Path. */
+    public FileSystem getFileSystem(Configuration conf) throws IOException {
+        return FileSystem.get(this.toUri(), conf);
+    }
+
+    /** True if the directory of this path is absolute. */
+    public boolean isAbsolute() {
+        int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+        return uri.getPath().startsWith(SEPARATOR, start);
+    }
+
+    /** Returns the final component of this path. */
+    public String getName() {
+        String path = uri.getPath();
+        int slash = path.lastIndexOf(SEPARATOR);
+        return path.substring(slash + 1);
+    }
+
+    /** Returns the parent of a path or null if at root. */
+    public Path getParent() {
+        String path = uri.getPath();
+        int lastSlash = path.lastIndexOf('/');
+        int start = hasWindowsDrive(path, true) ? 3 : 0;
+        if ((path.length() == start) || // empty path
+                (lastSlash == start && path.length() == start + 1)) { // at root
+            return null;
+        }
+        String parent;
+        if (lastSlash == -1) {
+            parent = CUR_DIR;
+        } else {
+            int end = hasWindowsDrive(path, true) ? 3 : 0;
+            parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
+        }
+        return new Path(uri.getScheme(), uri.getAuthority(), parent);
+    }
+
+    /** Adds a suffix to the final name in the path. */
+    public Path suffix(String suffix) {
+        return new Path(getParent(), getName() + suffix);
+    }
+
+    public String toString() {
+        // we can't use uri.toString(), which escapes everything, because we
+        // want
+        // illegal characters unescaped in the string, for glob processing, etc.
+        StringBuffer buffer = new StringBuffer();
+        if (uri.getScheme() != null) {
+            buffer.append(uri.getScheme());
+            buffer.append(":");
+        }
+        if (uri.getAuthority() != null) {
+            buffer.append("//");
+            buffer.append(uri.getAuthority());
+        }
+        if (uri.getPath() != null) {
+            String path = uri.getPath();
+            if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
+                                                                         // windows
+                                                                         // drive
+                    uri.getScheme() == null && // but no scheme
+                    uri.getAuthority() == null) // or authority
+                path = path.substring(1); // remove slash before drive
+            buffer.append(path);
+        }
+        if (uri.getFragment() != null) {
+            buffer.append("#");
+            buffer.append(uri.getFragment());
+        }
+        return buffer.toString();
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof Path)) {
+            return false;
+        }
+        Path that = (Path) o;
+        return this.uri.equals(that.uri);
+    }
+
+    public int hashCode() {
+        return uri.hashCode();
+    }
+
+    public int compareTo(Object o) {
+        Path that = (Path) o;
+        return this.uri.compareTo(that.uri);
+    }
+
+    /** Return the number of elements in this path. */
+    public int depth() {
+        String path = uri.getPath();
+        int depth = 0;
+        int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
+        while (slash != -1) {
+            depth++;
+            slash = path.indexOf(SEPARATOR, slash + 1);
+        }
+        return depth;
+    }
+
+    /** Returns a qualified path object. */
+    public Path makeQualified(FileSystem fs) {
+        Path path = this;
+        if (!isAbsolute()) {
+            path = new Path(fs.getWorkingDirectory(), this);
+        }
+
+        URI pathUri = path.toUri();
+        URI fsUri = fs.getUri();
+
+        String scheme = pathUri.getScheme();
+        String authority = pathUri.getAuthority();
+        String fragment = pathUri.getFragment();
+        if (scheme != null && (authority != null || fsUri.getAuthority() == null))
+            return path;
+
+        if (scheme == null) {
+            scheme = fsUri.getScheme();
+        }
+
+        if (authority == null) {
+            authority = fsUri.getAuthority();
+            if (authority == null) {
+                authority = "";
+            }
+        }
+
+        URI newUri = null;
+        try {
+            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return new Path(newUri);
+    }
+
+    /** Returns a qualified path object. */
+    public Path makeQualified(URI defaultUri, Path workingDir) {
+        Path path = this;
+        if (!isAbsolute()) {
+            path = new Path(workingDir, this);
+        }
+
+        URI pathUri = path.toUri();
+
+        String scheme = pathUri.getScheme();
+        String authority = pathUri.getAuthority();
+        String fragment = pathUri.getFragment();
+
+        if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
+            return path;
+
+        if (scheme == null) {
+            scheme = defaultUri.getScheme();
+        }
+
+        if (authority == null) {
+            authority = defaultUri.getAuthority();
+            if (authority == null) {
+                authority = "";
+            }
+        }
+
+        URI newUri = null;
+        try {
+            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return new Path(newUri);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
new file mode 100644
index 0000000..6643ab5
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * <code>InputSplit</code> represents the data to be processed by an individual
+ * {@link Mapper}.
+ * 
+ * <p>
+ * Typically, it presents a byte-oriented view on the input and is the
+ * responsibility of {@link RecordReader} of the job to process this and present
+ * a record-oriented view.
+ * 
+ * @see InputFormat
+ * @see RecordReader
+ */
+public abstract class InputSplit implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Get the size of the split, so that the input splits can be sorted by
+     * size.
+     * 
+     * @return the number of bytes in the split
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract long getLength() throws IOException, InterruptedException;
+
+    /**
+     * Get the list of nodes by name where the data for the split would be
+     * local. The locations do not need to be serialized.
+     * 
+     * @return a new array of the node nodes.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract String[] getLocations() throws IOException, InterruptedException;
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/conf/cluster.properties b/pregelix-core/src/main/resources/conf/cluster.properties
new file mode 100644
index 0000000..74a98b0
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=~/workspace/hyracks_asterix_stabilization
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/pregelix-core/src/main/resources/conf/master b/pregelix-core/src/main/resources/conf/master
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/slaves b/pregelix-core/src/main/resources/conf/slaves
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/stores.properties b/pregelix-core/src/main/resources/conf/stores.properties
new file mode 100644
index 0000000..d1a4e10
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/stores.properties
@@ -0,0 +1,2 @@
+#Comma separated directories for storing the partitioned graph on each machine
+store=/tmp/teststore1,/tmp/teststore2
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/hyracks-deployment.properties b/pregelix-core/src/main/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..8c84699
--- /dev/null
+++ b/pregelix-core/src/main/resources/hyracks-deployment.properties
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------------
+# /*
+#  * Copyright 2009-2010 by The Regents of the University of California
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * you may obtain a copy of the License from
+#  * 
+#  *     http://www.apache.org/licenses/LICENSE-2.0
+#  * 
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+#-------------------------------------------------------------------------------
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
diff --git a/pregelix-core/src/main/resources/scripts/pregelix b/pregelix-core/src/main/resources/scripts/pregelix
new file mode 100644
index 0000000..275175d
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix
@@ -0,0 +1,115 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+#  Copyright 2001-2006 The Apache Software Foundation.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ----------------------------------------------------------------------------
+#
+#   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights
+#   reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  Darwin*) darwin=true
+           if [ -z "$JAVA_VERSION" ] ; then
+             JAVA_VERSION="CurrentJDK"
+           else
+             echo "Using Java version: $JAVA_VERSION"
+           fi
+           if [ -z "$JAVA_HOME" ] ; then
+             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+           fi
+           ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD=`which java`
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." 1>&2
+  echo "  We cannot execute $JAVACMD" 1>&2
+  exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+  REPO="$BASEDIR"/lib
+fi
+
+cp $BASEDIR"/../a-hadoop-patch.jar "$REPO"/
+
+CLASSPATH=$CLASSPATH_PREFIX:"$BASEDIR"/etc:"$REPO"/a-hadoop-patch.jar:"$REPO"/pregelix-api-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-api-0.2.2-SNAPSHOT.jar:"$REPO"/json-20090211.jar:"$REPO"/httpclient-4.1-alpha2.jar:"$REPO"/httpcore-4.1-beta1.jar:"$REPO"/commons-logging-1.1.1.jar:"$REPO"/commons-codec-1.3.jar:"$REPO"/args4j-2.0.12.jar:"$REPO"/hyracks-ipc-0.2.2-SNAPSHOT.jar:"$REPO"/commons-lang3-3.1.jar:"$REPO"/hyracks-data-std-0.2.2-SNAPSHOT.jar:"$REPO"/hadoop-core-0.20.2.jar:"$REPO"/commons-cli-1.2.jar:"$REPO"/xmlenc-0.52.jar:"$REPO"/commons-httpclient-3.0.1.jar:"$REPO"/commons-net-1.4.1.jar:"$REPO"/oro-2.0.8.jar:"$REPO"/jetty-6.1.14.jar:"$REPO"/jetty-util-6.1.14.jar:"$REPO"/servlet-api-2.5-6.1.14.jar:"$REPO"/jasper-runtime-5.5.12.jar:"$REPO"/jasper-compiler-5.5.12.jar:"$REPO"/jsp-api-2.1-6.1.14.jar:"$REPO"/jsp-2.1-6.1.14.jar:"$REPO"/core-3.1.1.jar:"$REPO"/ant-1.6.5.jar:"$REPO"/commons-el-1.0.jar:"$REPO"/jets3t-0.7.1.jar:"$REPO"/kfs-0.3.jar:"$REPO"/hsqldb-1.8.0.10.jar:"$REPO"/pregelix-dataflow-std-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-std-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar:"$REPO"/dcache-client-0.0.1.jar:"$REPO"/jetty-client-8.0.0.M0.jar:"$REPO"/jetty-http-8.0.0.RC0.jar:"$REPO"/jetty-io-8.0.0.RC0.jar:"$REPO"/jetty-util-8.0.0.RC0.jar:"$REPO"/hyracks-storage-am-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar:"$REPO"/btreehelper-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-cc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-common-0.2.2-SNAPSHOT.jar:"$REPO"/commons-io-1.3.1.jar:"$REPO"/jetty-server-8.0.0.RC0.jar:"$REPO"/servlet-api-3.0.20100224.jar:"$REPO"/jetty-continuation-8.0.0.RC0.jar:"$REPO"/jetty-webapp-8.0.0.RC0.jar:"$REPO"/jetty-xml-8.0.0.RC0.jar:"$REPO"/jetty-servlet-8.0.0.RC0.jar:"$REPO"/jetty-security-8.0.0.RC0.jar:"$REPO"/wicket-core-1.5.2.jar:"$REPO"/wicket-util-1.5.2.jar:"$REPO"/slf4j-api-1.6.1.jar:"$REPO"/wicket-request-1.5.2.jar:"$REPO"/slf4j-jcl-1.6.3.jar:"$REPO"/hyracks-control-nc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-net-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-runtime-0.0.1-SNAPSHOT.jar:"$REPO"/hadoop-test-0.20.2.jar:"$REPO"/ftplet-api-1.0.0.jar:"$REPO"/mina-core-2.0.0-M5.jar:"$REPO"/ftpserver-core-1.0.0.jar:"$REPO"/ftpserver-deprecated-1.0.0-M2.jar:"$REPO"/javax.servlet-api-3.0.1.jar:"$REPO"/pregelix-core-0.0.1-SNAPSHOT.jar
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS  \
+  -classpath "$CLASSPATH" \
+  -Dapp.name="pregelix" \
+  -Dapp.pid="$$" \
+  -Dapp.repo="$REPO" \
+  -Dapp.home="$BASEDIR" \
+  -Dbasedir="$BASEDIR" \
+  org.apache.hadoop.util.RunJar \
+  "$@"
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/pregelix.bat b/pregelix-core/src/main/resources/scripts/pregelix.bat
new file mode 100644
index 0000000..536e3c8
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix.bat
@@ -0,0 +1,110 @@
+@REM ----------------------------------------------------------------------------

+@REM  Copyright 2001-2006 The Apache Software Foundation.

+@REM

+@REM  Licensed under the Apache License, Version 2.0 (the "License");

+@REM  you may not use this file except in compliance with the License.

+@REM  You may obtain a copy of the License at

+@REM

+@REM       http://www.apache.org/licenses/LICENSE-2.0

+@REM

+@REM  Unless required by applicable law or agreed to in writing, software

+@REM  distributed under the License is distributed on an "AS IS" BASIS,

+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+@REM  See the License for the specific language governing permissions and

+@REM  limitations under the License.

+@REM ----------------------------------------------------------------------------

+@REM

+@REM   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights

+@REM   reserved.

+

+@echo off

+

+set ERROR_CODE=0

+

+:init

+@REM Decide how to startup depending on the version of windows

+

+@REM -- Win98ME

+if NOT "%OS%"=="Windows_NT" goto Win9xArg

+

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" @setlocal

+

+@REM -- 4NT shell

+if "%eval[2+2]" == "4" goto 4NTArgs

+

+@REM -- Regular WinNT shell

+set CMD_LINE_ARGS=%*

+goto WinNTGetScriptDir

+

+@REM The 4NT Shell from jp software

+:4NTArgs

+set CMD_LINE_ARGS=%$

+goto WinNTGetScriptDir

+

+:Win9xArg

+@REM Slurp the command line arguments.  This loop allows for an unlimited number

+@REM of arguments (up to the command line limit, anyway).

+set CMD_LINE_ARGS=

+:Win9xApp

+if %1a==a goto Win9xGetScriptDir

+set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1

+shift

+goto Win9xApp

+

+:Win9xGetScriptDir

+set SAVEDIR=%CD%

+%0\

+cd %0\..\.. 

+set BASEDIR=%CD%

+cd %SAVEDIR%

+set SAVE_DIR=

+goto repoSetup

+

+:WinNTGetScriptDir

+set BASEDIR=%~dp0\..

+

+:repoSetup

+

+

+if "%JAVACMD%"=="" set JAVACMD=java

+

+if "%REPO%"=="" set REPO=%BASEDIR%\lib

+

+cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\

+

+set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar

+goto endInit

+

+@REM Reaching here means variables are defined and arguments have been captured

+:endInit

+

+%JAVACMD% %JAVA_OPTS%  -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%

+if ERRORLEVEL 1 goto error

+goto end

+

+:error

+if "%OS%"=="Windows_NT" @endlocal

+set ERROR_CODE=%ERRORLEVEL%

+

+:end

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" goto endNT

+

+@REM For old DOS remove the set variables from ENV - we assume they were not set

+@REM before we started - at least we don't leave any baggage around

+set CMD_LINE_ARGS=

+goto postExec

+

+:endNT

+@REM If error code is set to 1 then the endlocal was done already in :error.

+if %ERROR_CODE% EQU 0 @endlocal

+

+

+:postExec

+

+if "%FORCE_EXIT_ON_ERROR%" == "on" (

+  if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%

+)

+

+exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/startAllNCs.sh b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
new file mode 100644
index 0000000..64b7fcf
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${PREGELIX_PATH}; echo `pwd`; bin/startnc.sh"
+done
diff --git a/pregelix-core/src/main/resources/scripts/startCluster.sh b/pregelix-core/src/main/resources/scripts/startCluster.sh
new file mode 100644
index 0000000..bc1e7c4
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startCluster.sh
@@ -0,0 +1,3 @@
+bin/startcc.sh
+sleep 10
+bin/startAllNCs.sh
diff --git a/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix-core/src/main/resources/scripts/startcc.sh
new file mode 100644
index 0000000..aac9e60
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Remove the temp dir
+rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+echo $JAVA_HOME
+echo $JAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix-core/src/main/resources/scripts/startnc.sh
new file mode 100644
index 0000000..1da6248
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -0,0 +1,47 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get IP Address
+IPADDR=`/sbin/ifconfig eth0 | grep "inet addr" | awk '{print $2}' | cut -f 2 -d ':'`
+#HOSTNAME=`hostname`
+#echo $HOSTNAME
+#IPADDR=`dig +short ${HOSTNAME}`
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+chmod -R 755 $HYRACKS_HOME
+echo $HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" -frame-size $FRAME_SIZE
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
new file mode 100644
index 0000000..171495a
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -0,0 +1,597 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.core.util.TestUtils;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+
+public class JoinTest {
+    private final static String ACTUAL_RESULT_DIR = "actual";
+    private final static String EXPECT_RESULT_DIR = "expected";
+    private final static String ACTUAL_RESULT_FILE = ACTUAL_RESULT_DIR + File.separator + "join.txt";
+    private final static String EXPECTED_RESULT_FILE = EXPECT_RESULT_DIR + File.separator + "join.txt";
+    private final static String JOB_NAME = "JOIN_TEST";
+    private static final String HYRACKS_APP_NAME = "giraph";
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+
+    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/data.properties";
+
+    private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+    private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+
+    private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
+            UTF8StringPointable.FACTORY);
+    private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
+            UTF8StringPointable.FACTORY);
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    @Test
+    public void customerOrderCIDJoinMulti() throws Exception {
+        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+        cleanupStores();
+        PregelixHyracksIntegrationUtil.init();
+        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+
+        FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        runCreate();
+        runBulkLoad();
+        runHashJoin();
+        runIndexJoin();
+        TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+        FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        runLeftOuterHashJoin();
+        runIndexRightOuterJoin();
+        TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+        PregelixHyracksIntegrationUtil.deinit();
+    }
+
+    private void runHashJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+                new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custOrderJoinDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
+        IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+        spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runCreate() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runBulkLoad() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        int[] fieldPermutation = new int[custDesc.getFields().length];
+        for (int i = 0; i < fieldPermutation.length; i++)
+            fieldPermutation[i] = i;
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runIndexJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        /** sort operator */
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        /** index join operator */
+        int[] keyFields = new int[1];
+        keyFields[0] = 1;
+        IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+        keyComparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        /** results (already in sorted order) */
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                sorter, 0, join, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                join, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runLeftOuterHashJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+                new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, true,
+                nullWriterFactories, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        int[] projectFields = new int[] { 8, 9, 10, 11, 12, 13, 14, 15, 16, 0, 1, 2, 3, 4, 5, 6, 7 };
+        ProjectOperatorDescriptor project = new ProjectOperatorDescriptor(spec, custOrderJoinDesc, projectFields);
+
+        int[] sortFields = new int[2];
+        sortFields[0] = 9;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custOrderJoinDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, project, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
+        IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 9 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+        spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runIndexRightOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        /** sort operator */
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        /** index join operator */
+        int[] keyFields = new int[1];
+        keyFields[0] = 1;
+        IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+        keyComparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory(), true, nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        /** results (already in sorted order) */
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+
+        IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
+        mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+        mergeComparatorFactories[1] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+        int[] mergeFields = new int[] { 9, 0 };
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runTest(JobSpecification spec) throws Exception {
+        PregelixHyracksIntegrationUtil.runJob(spec, HYRACKS_APP_NAME);
+    }
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
new file mode 100644
index 0000000..8f2e546
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+public class JoinTestNullWriterFactory implements INullWriterFactory {
+    private static final long serialVersionUID = 1L;
+    public static INullWriterFactory INSTANCE = new JoinTestNullWriterFactory();
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new INullWriter() {
+
+            @Override
+            public void writeNull(DataOutput out) throws HyracksDataException {
+                UTF8StringSerializerDeserializer.INSTANCE.serialize("NULL", out);
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
new file mode 100644
index 0000000..83dd10d
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+public class TestUtils {
+
+    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+        String lineExpected, lineActual;
+        int num = 1;
+        try {
+            while ((lineExpected = readerExpected.readLine()) != null) {
+                lineActual = readerActual.readLine();
+                // Assert.assertEquals(lineExpected, lineActual);
+                if (lineActual == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+                }
+                if (!equalStrings(lineExpected, lineActual)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + lineActual);
+                }
+                ++num;
+            }
+            lineActual = readerActual.readLine();
+            if (lineActual != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+            }
+        } finally {
+            readerExpected.close();
+            readerActual.close();
+        }
+    }
+
+    private static boolean equalStrings(String s1, String s2) {
+        String[] rowsOne = s1.split("\n");
+        String[] rowsTwo = s2.split("\n");
+
+        if (rowsOne.length != rowsTwo.length)
+            return false;
+
+        for (int i = 0; i < rowsOne.length; i++) {
+            String row1 = rowsOne[i];
+            String row2 = rowsTwo[i];
+
+            if (row1.equals(row2))
+                continue;
+
+            String[] fields1 = row1.split(",");
+            String[] fields2 = row2.split(",");
+
+            for (int j = 0; j < fields1.length; j++) {
+                if (fields1[j].equals(fields2[j])) {
+                    continue;
+                } else if (fields1[j].indexOf('.') < 0) {
+                    return false;
+                } else {
+                    fields1[j] = fields1[j].split("=")[1];
+                    fields2[j] = fields2[j].split("=")[1];
+                    Double double1 = Double.parseDouble(fields1[j]);
+                    Double double2 = Double.parseDouble(fields2[j]);
+                    float float1 = (float) double1.doubleValue();
+                    float float2 = (float) double2.doubleValue();
+
+                    if (Math.abs(float1 - float2) == 0)
+                        continue;
+                    else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/pregelix-core/src/test/resources/cluster/data.properties b/pregelix-core/src/test/resources/cluster/data.properties
new file mode 100644
index 0000000..daf881e
--- /dev/null
+++ b/pregelix-core/src/test/resources/cluster/data.properties
@@ -0,0 +1 @@
+store=teststore
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/hadoop/conf/core-site.xml b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+    <name>fs.default.name</name>
+    <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+    <name>hadoop.tmp.dir</name>
+    <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+   <name>dfs.replication</name>
+   <value>1</value>
+</property>
+
+<property>
+	<name>dfs.block.size</name>
+	<value>65536</value>
+</property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/log4j.properties b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..1b9a4d6
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>localhost:29007</value>
+  </property>
+  <property>
+     <name>mapred.tasktracker.map.tasks.maximum</name>
+     <value>20</value>
+  </property>
+   <property>
+      <name>mapred.tasktracker.reduce.tasks.maximum</name>
+      <value>20</value>
+   </property>
+   <property>
+      <name>mapred.min.split.size</name>
+      <value>65536</value>
+   </property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hyracks-deployment.properties b/pregelix-core/src/test/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..2ae9818
--- /dev/null
+++ b/pregelix-core/src/test/resources/hyracks-deployment.properties
@@ -0,0 +1,2 @@
+#cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/log4j.properties b/pregelix-core/src/test/resources/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/logging.properties b/pregelix-core/src/test/resources/logging.properties
new file mode 100644
index 0000000..f43eb05
--- /dev/null
+++ b/pregelix-core/src/test/resources/logging.properties
@@ -0,0 +1,66 @@
+############################################################
+#  	Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.  
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+#  	Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler 
+# classes.  These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+# handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers.  For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+.level= WARNING
+# .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = WARNING
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = INFO
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
\ No newline at end of file