add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/src/main/assembly/binary-assembly.xml b/pregelix-core/src/main/assembly/binary-assembly.xml
new file mode 100755
index 0000000..0500499
--- /dev/null
+++ b/pregelix-core/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
new file mode 100644
index 0000000..0a169e0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+public interface IDriver {
+
+    public static enum Plan {
+        INNER_JOIN, OUTER_JOIN, OUTER_JOIN_SORT, OUTER_JOIN_SINGLE_SORT
+    }
+
+    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException;
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
new file mode 100644
index 0000000..2a91e0b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public interface IJobGen {
+
+    public JobSpecification generateCreatingJob() throws HyracksException;
+
+    public JobSpecification generateLoadingJob() throws HyracksException;
+
+    public JobSpecification generateJob(int iteration) throws HyracksException;
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..d839f2a
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public interface INormalizedKeyComputerFactoryProvider {
+
+    @SuppressWarnings("rawtypes")
+    INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass);
+
+    @SuppressWarnings("rawtypes")
+    INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass);
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
new file mode 100644
index 0000000..8cb2a50
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.pregelix.core.data;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+
+public class TypeTraits implements ITypeTraits {
+
+    private static final long serialVersionUID = 1L;
+    private final int length;
+    private final boolean isFixedLength;
+
+    public TypeTraits(boolean isFixedLength) {
+        this.isFixedLength = isFixedLength;
+        this.length = 0;
+    }
+
+    public TypeTraits(int length) {
+        this.isFixedLength = true;
+        this.length = length;
+    }
+
+    @Override
+    public boolean isFixedLength() {
+        return isFixedLength;
+    }
+
+    @Override
+    public int getFixedLength() {
+        return length;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
new file mode 100644
index 0000000..7076f7f
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -0,0 +1,220 @@
+package edu.uci.ics.pregelix.core.driver;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+@SuppressWarnings("rawtypes")
+public class Driver implements IDriver {
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private static String LIB = "lib";
+    private JobGen jobGen;
+    private PregelixJob job;
+    private boolean profiling;
+
+    private String applicationName;
+    private String GIRAPH_HOME = "GIRAPH_HOME";
+    private IHyracksClientConnection hcc;
+
+    private Class exampleClass;
+
+    public Driver(Class exampleClass) {
+        this.exampleClass = exampleClass;
+    }
+
+    @Override
+    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException {
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.job = job;
+        this.profiling = profiling;
+        try {
+            switch (planChoice) {
+                case INNER_JOIN:
+                    jobGen = new JobGenInnerJoin(job);
+                    break;
+                case OUTER_JOIN:
+                    jobGen = new JobGenOuterJoin(job);
+                    break;
+                case OUTER_JOIN_SORT:
+                    jobGen = new JobGenOuterJoinSort(job);
+                    break;
+                case OUTER_JOIN_SINGLE_SORT:
+                    jobGen = new JobGenOuterJoinSingleSort(job);
+                    break;
+                default:
+                    jobGen = new JobGenInnerJoin(job);
+            }
+
+            if (hcc == null)
+                hcc = new HyracksConnection(ipAddress, port);
+            ClusterConfig.loadClusterConfig(ipAddress, port);
+
+            URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+            URL[] urls = classLoader.getURLs();
+            String jarFile = "";
+            for (URL url : urls)
+                if (url.toString().endsWith(".jar"))
+                    jarFile = url.getPath();
+
+            installApplication(jarFile);
+            FileSystem dfs = FileSystem.get(job.getConfiguration());
+            dfs.delete(FileOutputFormat.getOutputPath(job), true);
+
+            runCreate(jobGen);
+            runDataLoad(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("data loading finished " + time + "ms");
+            int i = 1;
+            boolean terminate = false;
+            do {
+                start = System.currentTimeMillis();
+                runLoopBodyIteration(jobGen, i);
+                end = System.currentTimeMillis();
+                time = end - start;
+                LOG.info("iteration " + i + " finished " + time + "ms");
+                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+                i++;
+            } while (!terminate);
+
+            start = System.currentTimeMillis();
+            runHDFSWRite(jobGen);
+            runCleanup(jobGen);
+            destroyApplication(applicationName);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+            execute(treeCreateSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runDataLoad(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+            execute(bulkLoadJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+        try {
+            JobSpecification loopBody = jobGen.generateJob(iteration);
+            execute(loopBody);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runHDFSWRite(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+            execute(scanSortPrintJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runCleanup(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification[] cleanups = jobGen.generateCleanup();
+            runJobArray(cleanups);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runJobArray(JobSpecification[] jobs) throws Exception {
+        for (JobSpecification job : jobs) {
+            execute(job);
+        }
+    }
+
+    private void execute(JobSpecification job) throws Exception {
+        JobId jobId = hcc.startJob(applicationName, job,
+                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+
+    public void installApplication(String jarFile) throws Exception {
+        System.out.println(jarFile);
+        applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
+        String home = System.getProperty(GIRAPH_HOME);
+        if (home == null)
+            home = "./";
+        String libDir = home + LIB;
+        File dir = new File(libDir);
+        if (!dir.isDirectory()) {
+            throw new HyracksException(libDir + " is not a directory!");
+        }
+        System.out.println(dir.getAbsolutePath());
+        File[] libJars = dir.listFiles(new FileFilter("jar"));
+        Set<String> allJars = new TreeSet<String>();
+        allJars.add(jarFile);
+        for (File jar : libJars) {
+            allJars.add(jar.getAbsolutePath());
+        }
+        File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+        hcc.createApplication(applicationName, appZip);
+    }
+
+    public void destroyApplication(String jarFile) throws Exception {
+        hcc.destroyApplication(applicationName);
+    }
+
+}
+
+class FileFilter implements FilenameFilter {
+    private String ext;
+
+    public FileFilter(String ext) {
+        this.ext = "." + ext;
+    }
+
+    public boolean accept(File dir, String name) {
+        return name.endsWith(ext);
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
new file mode 100644
index 0000000..85f3e6b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+public class ConfigurationFactory implements IConfigurationFactory {
+    private static final long serialVersionUID = 1L;
+    private final byte[] data;
+
+    public ConfigurationFactory(Configuration conf) {
+        try {
+            data = SerDeUtils.serialize(conf);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public Configuration createConfiguration() throws HyracksDataException {
+        try {
+            Configuration conf = new Configuration();
+            SerDeUtils.deserialize(conf, data);
+            conf.setClassLoader(this.getClass().getClassLoader());
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
new file mode 100644
index 0000000..38f12fe
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class Message<I extends Writable, M extends Writable> implements Writable {
+    private I receiverId;
+    private M body;
+    private Configuration conf;
+
+    public Message() {
+    }
+
+    public Message(I receiverId, M body) {
+        this.receiverId = receiverId;
+        this.body = body;
+    }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        if (this.receiverId == null && this.body == null) {
+            setClass((Class<I>) BspUtils.getVertexIndexClass(getConf()),
+                    (Class<M>) BspUtils.getMessageValueClass(getConf()));
+        }
+        receiverId.readFields(input);
+        body.readFields(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        receiverId.write(output);
+        body.write(output);
+    }
+
+    public I getReceiverVertexId() {
+        return receiverId;
+    }
+
+    public M getMessageBody() {
+        return body;
+    }
+
+    private void setClass(Class<I> idClass, Class<M> bodyClass) {
+        receiverId = ReflectionUtils.newInstance(idClass, getConf());
+        body = ReflectionUtils.newInstance(bodyClass, getConf());
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
new file mode 100644
index 0000000..880e75c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+
+@SuppressWarnings("rawtypes")
+public class MessageList extends ArrayListWritable<Message> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void setClass() {
+        setClass(Message.class);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
new file mode 100644
index 0000000..9291e94
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IJobGen;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public abstract class JobGen implements IJobGen {
+    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+    protected static final int MB = 1048576;
+    protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    protected static final int frameSize = 65536;
+    protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
+    protected static final int tableSize = 10485767;
+    protected static final String PRIMARY_INDEX = "primary";
+    protected final Configuration conf;
+    protected final PregelixJob giraphJob;
+    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    protected static final String SECONDARY_INDEX_ODD = "secondary1";
+    protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+
+    public JobGen(PregelixJob job) {
+        this.conf = job.getConfiguration();
+        this.giraphJob = job;
+        this.initJobConfiguration();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void initJobConfiguration() {
+        Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+        List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+        Type vertexIndexType = parameterTypes.get(0);
+        Type vertexValueType = parameterTypes.get(1);
+        Type edgeValueType = parameterTypes.get(2);
+        Type messageValueType = parameterTypes.get(3);
+        conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+        conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+        conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+        conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateLoadingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner, splits);
+
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
+
+        /**
+         * construct tree bulk load operator
+         */
+        int[] fieldPermutation = new int[2];
+        fieldPermutation[0] = 0;
+        fieldPermutation[1] = 1;
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+        return spec;
+    }
+
+    @Override
+    public JobSpecification generateJob(int iteration) throws HyracksException {
+        if (iteration <= 0)
+            throw new IllegalStateException("iteration number cannot be less than 1");
+        if (iteration == 1)
+            return generateFirstIteration(iteration);
+        else
+            return generateNonFirstIteration(iteration);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                confFactory);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), sorter, 0, writer, 0);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        /**
+         * connect operator descriptors
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct write file operator
+         */
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+        ClusterConfig.setLocationConstraint(spec, writer);
+
+        /**
+         * connect operator descriptors
+         */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        return spec;
+    }
+
+    /***
+     * drop the sindex
+     * 
+     * @return JobSpecification
+     * @throws HyracksException
+     */
+    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistryProvider, fileSplitProvider);
+
+        ClusterConfig.setLocationConstraint(spec, drop);
+        spec.addRoot(drop);
+        return spec;
+    }
+
+    /** generate non-first iteration job */
+    protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+
+    /** generate first iteration job */
+    protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+    /** generate clean-up job */
+    public abstract JobSpecification[] generateCleanup() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
new file mode 100644
index 0000000..12e1cd7
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenInnerJoin extends JobGen {
+
+    public JobGenInnerJoin(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct drop index operator
+         */
+        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 3, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage,
+                rdDummy, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        /**
+         * construct bulk-load index operator
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+                new BTreeDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * construct unnest operator
+         */
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /**
+         * do pre- & post- super step
+         */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 2, btreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(emptySink);
+        spec.addRoot(btreeBulkLoad);
+        spec.addRoot(terminateWriter);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct the index-set-union operator
+         */
+        String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
+        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory(), true);
+        ClusterConfig.setLocationConstraint(spec, setUnion);
+
+        /**
+         * construct index-join-function-update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct bulk-load index operator
+         */
+        int fieldPermutation[] = new int[] { 0, 1 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, interiorFrameFactory,
+                leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+                new BTreeDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 2, btreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(emptySink);
+        spec.addRoot(btreeBulkLoad);
+        spec.addRoot(terminateWriter);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[3];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        cleanups[1] = this.dropIndex(SECONDARY_INDEX_ODD);
+        cleanups[2] = this.dropIndex(SECONDARY_INDEX_EVEN);
+        return cleanups;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
new file mode 100644
index 0000000..e1944eb
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoin extends JobGen {
+
+    public JobGenOuterJoin(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+                localGby, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
new file mode 100644
index 0000000..80d11ab
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSingleSort extends JobGen {
+
+    public JobGenOuterJoinSingleSort(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct global sort operator
+         */
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct global sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+
+        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
new file mode 100644
index 0000000..f1f89b6
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -0,0 +1,375 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSort extends JobGen {
+
+    public JobGenOuterJoinSort(PregelixJob job) {
+        super(job);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct btree search operator
+         */
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+
+        /**
+         * construct compute operator
+         */
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+                interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        /**
+         * construct local sort operator
+         */
+        int[] keyFields = new int[] { 0 };
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global sort operator
+         */
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink2);
+
+        return spec;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * source aggregate
+         */
+        int[] keyFields = new int[] { 0 };
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MsgList.class.getName());
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct pre-superstep hook
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+        /**
+         * construct index join function update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                MessageList.class.getName());
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+        ClusterConfig.setLocationConstraint(spec, join);
+
+        /**
+         * construct local sort operator
+         */
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+                .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localSort);
+
+        /**
+         * construct local pre-clustered group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, localGby);
+
+        /**
+         * construct global sort operator
+         */
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+        ClusterConfig.setLocationConstraint(spec, globalSort);
+
+        /**
+         * construct global group-by operator
+         */
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+                .getAccumulatingAggregatorFactory(conf, true);
+        PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, globalGby);
+
+        /**
+         * construct the materializing write operator
+         */
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, materialize);
+
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new PostSuperStepRuntimeHookFactory(jobId));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink);
+
+        /**
+         * termination state write operator
+         */
+        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+                configurationFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+                rdUnnestedMessage.getFields()[0]);
+
+        /** connect all operators **/
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+                terminateWriter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+        spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink);
+        return spec;
+    }
+
+    @Override
+    public JobSpecification[] generateCleanup() throws HyracksException {
+        JobSpecification[] cleanups = new JobSpecification[1];
+        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+        return cleanups;
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
new file mode 100644
index 0000000..608980c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class JobGenUtil {
+
+    /**
+     * get normalized key factory for an iteration, to sort messages iteration
+     * 1: asc order iteration 2: desc order
+     * 
+     * @param iteration
+     * @param keyClass
+     * @return
+     */
+    public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(int iteration, Class keyClass) {
+        return NormalizedKeyComputerFactoryProvider.INSTANCE.getAscINormalizedKeyComputerFactory(keyClass);
+    }
+
+    /**
+     * get comparator for an iteration, to sort messages iteration 1: asc order
+     * iteration 0: desc order
+     * 
+     * @param iteration
+     * @param keyClass
+     * @return
+     */
+    public static IBinaryComparatorFactory getIBinaryComparatorFactory(int iteration, Class keyClass) {
+        return new WritableComparingBinaryComparatorFactory(keyClass);
+    }
+
+    /**
+     * get the B-tree scan order for an iteration iteration 1: desc order,
+     * backward scan iteration 2: asc order, forward scan
+     * 
+     * @param iteration
+     * @return
+     */
+    public static boolean getForwardScan(int iteration) {
+        return true;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
new file mode 100644
index 0000000..a22b6f4
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen.clusterconfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ClusterConfig {
+
+    private static String[] NCs;
+    private static String propertiesPath = "conf/stores.properties";
+    private static Map<String, List<String>> ipToNcMapping;
+    private static String[] stores;
+
+    /**
+     * let tests set config path to be whatever
+     * 
+     * @param confPath
+     * @param propertiesPath
+     */
+    public static void setStorePath(String propertiesPath) {
+        ClusterConfig.propertiesPath = propertiesPath;
+    }
+
+    /**
+     * get NC names running on one IP address
+     * 
+     * @param ipAddress
+     * @return
+     * @throws HyracksDataException
+     */
+    public static List<String> getNCNames(String ipAddress) throws HyracksException {
+        return ipToNcMapping.get(ipAddress);
+    }
+
+    /**
+     * get file split provider
+     * 
+     * @param jobId
+     * @return
+     * @throws HyracksDataException
+     */
+    public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
+        int i = 0;
+        for (String nc : NCs) {
+            for (String st : stores) {
+                FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
+                        + File.separator + indexName);
+                fileSplits[i++] = split;
+            }
+        }
+        return new ConstantFileSplitProvider(fileSplits);
+    }
+
+    private static void loadStores() throws HyracksException {
+        Properties properties = new Properties();
+        try {
+            properties.load(new FileInputStream(propertiesPath));
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        String store = properties.getProperty("store");
+        stores = store.split(";");
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
+            List<InputSplit> splits) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = splits.size();
+        String[] locations = new String[splits.size()];
+        Random random = new Random(System.currentTimeMillis());
+        for (int i = 0; i < splits.size(); i++) {
+            try {
+                String[] loc = splits.get(i).getLocations();
+                Collections.shuffle(Arrays.asList(loc), random);
+                if (loc.length > 0) {
+                    InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
+                    for (InetAddress ip : allIps) {
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+                            int pos = random.nextInt(ncs.size());
+                            locations[i] = ncs.get(pos);
+                        } else {
+                            int pos = random.nextInt(NCs.length);
+                            locations[i] = NCs[pos];
+                        }
+                    }
+                } else {
+                    int pos = random.nextInt(NCs.length);
+                    locations[i] = NCs[pos];
+                }
+            } catch (IOException e) {
+                throw new HyracksException(e);
+            } catch (InterruptedException e) {
+                throw new HyracksException(e);
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+            throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = 0;
+        String[] locations = new String[NCs.length * stores.length];
+        for (String nc : NCs) {
+            for (int i = 0; i < stores.length; i++) {
+                locations[count] = nc;
+                count++;
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+    }
+
+    /**
+     * set location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
+        if (stores == null) {
+            loadStores();
+        }
+        int count = NCs.length * stores.length;
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+    }
+
+    public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
+        try {
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            NCs = new String[ncNameToNcInfos.size()];
+            ipToNcMapping = new HashMap<String, List<String>>();
+            int i = 0;
+            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+                        .getHostAddress();
+                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+                if (matchedNCs == null) {
+                    matchedNCs = new ArrayList<String>();
+                    ipToNcMapping.put(ipAddr, matchedNCs);
+                }
+                matchedNCs.add(entry.getKey());
+                NCs[i] = entry.getKey();
+                i++;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..1c331c0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.pregelix.core.jobgen.provider;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.core.base.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongAscNormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongDescNormalizedKeyComputerFactory;
+
+public class NormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
+
+    public static INormalizedKeyComputerFactoryProvider INSTANCE = new NormalizedKeyComputerFactoryProvider();
+
+    private NormalizedKeyComputerFactoryProvider() {
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass) {
+        if (keyClass.getName().indexOf("VLongWritable") > 0)
+            return new VLongAscNormalizedKeyComputerFactory();
+        else
+            return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass) {
+        if (keyClass.getName().indexOf("VLongWritable") > 0)
+            return new VLongDescNormalizedKeyComputerFactory();
+        else
+            return null;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
new file mode 100644
index 0000000..d1d927d
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private String[] fieldClasses;
+
+    public WritableRecordDescriptorFactory(String... fieldClasses) {
+        this.fieldClasses = fieldClasses;
+    }
+
+    @Override
+    public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+        try {
+            return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
new file mode 100644
index 0000000..3f15197
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+public class BufferSerDeUtils {
+
+    public static double getDouble(byte[] bytes, int offset) {
+        return Double.longBitsToDouble(getLong(bytes, offset));
+    }
+
+    public static float getFloat(byte[] bytes, int offset) {
+        return Float.intBitsToFloat(getInt(bytes, offset));
+    }
+
+    public static boolean getBoolean(byte[] bytes, int offset) {
+        if (bytes[offset] == 0)
+            return false;
+        else
+            return true;
+    }
+
+    public static int getInt(byte[] bytes, int offset) {
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+                + ((bytes[offset + 3] & 0xff) << 0);
+    }
+
+    public static long getLong(byte[] bytes, int offset) {
+        return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+                + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+                + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+    }
+
+    public static void writeBoolean(boolean value, byte[] bytes, int offset) {
+        if (value)
+            bytes[offset] = (byte) 1;
+        else
+            bytes[offset] = (byte) 0;
+    }
+
+    public static void writeInt(int value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeLong(long value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 56);
+        bytes[offset++] = (byte) (value >> 48);
+        bytes[offset++] = (byte) (value >> 40);
+        bytes[offset++] = (byte) (value >> 32);
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeDouble(double value, byte[] bytes, int offset) {
+        long lValue = Double.doubleToLongBits(value);
+        writeLong(lValue, bytes, offset);
+    }
+
+    public static void writeFloat(float value, byte[] bytes, int offset) {
+        int iValue = Float.floatToIntBits(value);
+        writeInt(iValue, bytes, offset);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
new file mode 100644
index 0000000..d0cff48
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class DataBalancer {
+
+    public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+        public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+                throws IOException {
+            output.collect(id, inputValue);
+        }
+    }
+
+    public static class ReduceRecordOnly extends MapReduceBase implements
+            Reducer<LongWritable, Text, NullWritable, Text> {
+
+        NullWritable key = NullWritable.get();
+
+        public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext())
+                output.collect(key, inputValue.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        JobConf job = new JobConf(DataBalancer.class);
+
+        job.setJobName(DataBalancer.class.getSimpleName());
+        job.setMapperClass(MapRecordOnly.class);
+        job.setReducerClass(ReduceRecordOnly.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[2]));
+
+        if (args.length > 3) {
+            if (args[3].startsWith("bzip"))
+                FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+            if (args[3].startsWith("gz"))
+                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
+        JobClient.runJob(job);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
new file mode 100644
index 0000000..aa63edf
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
@@ -0,0 +1,213 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * generate graph data from a base dataset
+ * 
+ */
+@SuppressWarnings("deprecation")
+public class DataGenerator {
+
+    public static class MapMaxId extends MapReduceBase implements
+            Mapper<LongWritable, Text, NullWritable, VLongWritable> {
+        private NullWritable key = NullWritable.get();
+        private VLongWritable value = new VLongWritable();
+
+        @Override
+        public void map(LongWritable id, Text inputValue, OutputCollector<NullWritable, VLongWritable> output,
+                Reporter reporter) throws IOException {
+            String[] vertices = inputValue.toString().split(" ");
+            long max = Long.parseLong(vertices[0]);
+            for (int i = 1; i < vertices.length; i++) {
+                long vid = Long.parseLong(vertices[i]);
+                if (vid > max)
+                    max = vid;
+            }
+            value.set(max);
+            output.collect(key, value);
+        }
+    }
+
+    public static class ReduceMaxId extends MapReduceBase implements
+            Reducer<NullWritable, VLongWritable, NullWritable, Text> {
+
+        private NullWritable key = NullWritable.get();
+        private long max = Long.MIN_VALUE;
+        private OutputCollector<NullWritable, Text> output;
+
+        @Override
+        public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext()) {
+                long vid = inputValue.next().get();
+                if (vid > max)
+                    max = vid;
+            }
+            if (this.output == null)
+                this.output = output;
+
+        }
+
+        @Override
+        public void close() throws IOException {
+            output.collect(key, new Text(new VLongWritable(max).toString()));
+        }
+    }
+
+    public static class CombineMaxId extends MapReduceBase implements
+            Reducer<NullWritable, VLongWritable, NullWritable, VLongWritable> {
+
+        private NullWritable key = NullWritable.get();
+        private long max = Long.MIN_VALUE;
+        private OutputCollector<NullWritable, VLongWritable> output;
+
+        @Override
+        public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+                OutputCollector<NullWritable, VLongWritable> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext()) {
+                long vid = inputValue.next().get();
+                if (vid > max)
+                    max = vid;
+            }
+            if (this.output == null)
+                this.output = output;
+        }
+
+        public void close() throws IOException {
+            output.collect(key, new VLongWritable(max));
+        }
+    }
+
+    public static class MapRecordGen extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+        private long maxId = 0;
+        private Text text = new Text();
+        private int x = 2;
+
+        @Override
+        public void configure(JobConf conf) {
+            try {
+                x = conf.getInt("hyracks.x", 2);
+                String fileName = conf.get("hyracks.maxid.file");
+                FileSystem dfs = FileSystem.get(conf);
+                dfs.delete(new Path(fileName + "/_SUCCESS"), true);
+                dfs.delete(new Path(fileName + "/_logs"), true);
+                FileStatus[] files = dfs.listStatus(new Path(fileName));
+
+                for (int i = 0; i < files.length; i++) {
+                    if (!files[i].isDir()) {
+                        DataInputStream input = dfs.open(files[i].getPath());
+                        String id = input.readLine();
+                        maxId = Long.parseLong(id) + 1;
+                        input.close();
+                    }
+                }
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+                throws IOException {
+            String[] vertices = inputValue.toString().split(" ");
+
+            /**
+             * generate data x times
+             */
+            for (int k = 0; k < x; k++) {
+                long max = maxId * k;
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < vertices.length - 1; i++) {
+                    long vid = Long.parseLong(vertices[i]) + max;
+                    sb.append(vid);
+                    sb.append(" ");
+                }
+                long vid = Long.parseLong(vertices[vertices.length - 1]) + max;
+                sb.append(vid);
+                text.set(sb.toString().getBytes());
+                output.collect(id, text);
+            }
+        }
+    }
+
+    public static class ReduceRecordGen extends MapReduceBase implements
+            Reducer<LongWritable, Text, NullWritable, Text> {
+
+        private NullWritable key = NullWritable.get();
+
+        public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext())
+                output.collect(key, inputValue.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        JobConf job = new JobConf(DataGenerator.class);
+        FileSystem dfs = FileSystem.get(job);
+        String maxFile = "/maxtemp";
+        dfs.delete(new Path(maxFile), true);
+
+        job.setJobName(DataGenerator.class.getSimpleName() + "max ID");
+        job.setMapperClass(MapMaxId.class);
+        job.setCombinerClass(CombineMaxId.class);
+        job.setReducerClass(ReduceMaxId.class);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(VLongWritable.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(maxFile));
+        job.setNumReduceTasks(1);
+        JobClient.runJob(job);
+
+        job = new JobConf(DataGenerator.class);
+        job.set("hyracks.maxid.file", maxFile);
+        job.setInt("hyracks.x", Integer.parseInt(args[2]));
+        dfs.delete(new Path(args[1]), true);
+
+        job.setJobName(DataGenerator.class.getSimpleName());
+        job.setMapperClass(MapRecordGen.class);
+        job.setReducerClass(ReduceRecordGen.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[3]));
+
+        if (args.length > 4) {
+            if (args[4].startsWith("bzip"))
+                FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+            if (args[4].startsWith("gz"))
+                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
+        JobClient.runJob(job);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
new file mode 100644
index 0000000..f17802b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AggregationFunctionFactory;
+
+public class DataflowUtils {
+
+    public enum AggregationMode {
+        PARTIAL, FINAL
+    }
+
+    @SuppressWarnings("unchecked")
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
+            throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        try {
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                    (Class<? extends Writable>) Class.forName(className1),
+                    (Class<? extends Writable>) Class.forName(className2));
+        } catch (ClassNotFoundException cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        return recordDescriptor;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+        try {
+            int i = 0;
+            for (String className : classNames)
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
+                        .forName(className));
+        } catch (ClassNotFoundException cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        recordDescriptor = new RecordDescriptor(serdes);
+        return recordDescriptor;
+    }
+
+    public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
+            throws HyracksException {
+        IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+        return rdFactory;
+    }
+
+    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal) {
+        IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
+                isFinal);
+        IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+                new IAggregateFunctionFactory[] { aggFuncFactory }, new int[] { 0 }, new int[] {});
+        return aggregatorFactory;
+    }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..ed04746
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class PregelixHyracksIntegrationUtil {
+
+    public static final String NC1_ID = "nc1";
+    public static final String NC2_ID = "nc2";
+
+    public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+    public static final String CC_HOST = "localhost";
+
+    public static final int FRAME_SIZE = 65536;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = CC_HOST;
+        ccConfig.clusterNetIpAddress = CC_HOST;
+        ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
+        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
+        ccConfig.defaultMaxJobAttempts = 0;
+        ccConfig.jobHistorySize = 10;
+
+        // cluster controller
+        cc = new ClusterControllerService(ccConfig);
+        cc.start();
+
+        // two node controllers
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.clusterNetIPAddress = "localhost";
+        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        nc1 = new NodeControllerService(ncConfig1);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.clusterNetIPAddress = "localhost";
+        ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        nc2 = new NodeControllerService(ncConfig2);
+        nc2.start();
+
+        // hyracks connection
+        hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+        ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+    }
+
+    public static void destroyApp(String hyracksAppName) throws Exception {
+        hcc.destroyApplication(hyracksAppName);
+    }
+
+    public static void createApp(String hyracksAppName) throws Exception {
+        hcc.createApplication(hyracksAppName, null);
+    }
+
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+
+    public static void runJob(JobSpecification spec, String appName) throws Exception {
+        spec.setFrameSize(FRAME_SIZE);
+        JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        hcc.waitForCompletion(jobId);
+    }
+
+}
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
new file mode 100644
index 0000000..1fdfde6
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Names a file or directory in a {@link FileSystem}. Path strings use slash as
+ * the directory separator. A path string is absolute if it begins with a slash.
+ */
+@SuppressWarnings("rawtypes")
+public class Path implements Comparable, Serializable {
+    private static final long serialVersionUID = 1L;
+    /** The directory separator, a slash. */
+    public static final String SEPARATOR = "/";
+    public static final char SEPARATOR_CHAR = '/';
+
+    public static final String CUR_DIR = ".";
+
+    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+    private URI uri; // a hierarchical uri
+
+    /** Resolve a child path against a parent path. */
+    public Path(String parent, String child) {
+        this(new Path(parent), new Path(child));
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(Path parent, String child) {
+        this(parent, new Path(child));
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(String parent, Path child) {
+        this(new Path(parent), child);
+    }
+
+    /** Resolve a child path against a parent path. */
+    public Path(Path parent, Path child) {
+        // Add a slash to parent's path so resolution is compatible with URI's
+        URI parentUri = parent.uri;
+        String parentPath = parentUri.getPath();
+        if (!(parentPath.equals("/") || parentPath.equals("")))
+            try {
+                parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
+                        parentUri.getFragment());
+            } catch (URISyntaxException e) {
+                throw new IllegalArgumentException(e);
+            }
+        URI resolved = parentUri.resolve(child.uri);
+        initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
+                resolved.getFragment());
+    }
+
+    private void checkPathArg(String path) {
+        // disallow construction of a Path from an empty string
+        if (path == null) {
+            throw new IllegalArgumentException("Can not create a Path from a null string");
+        }
+        if (path.length() == 0) {
+            throw new IllegalArgumentException("Can not create a Path from an empty string");
+        }
+    }
+
+    /**
+     * Construct a path from a String. Path strings are URIs, but with unescaped
+     * elements and some additional normalization.
+     */
+    public Path(String pathString) {
+        checkPathArg(pathString);
+
+        // We can't use 'new URI(String)' directly, since it assumes things are
+        // escaped, which we don't require of Paths.
+
+        // add a slash in front of paths with Windows drive letters
+        if (hasWindowsDrive(pathString, false))
+            pathString = "/" + pathString;
+
+        // parse uri components
+        String scheme = null;
+        String authority = null;
+
+        int start = 0;
+
+        // parse uri scheme, if any
+        int colon = pathString.indexOf(':');
+        int slash = pathString.indexOf('/');
+        if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
+                                                                   // scheme
+            scheme = pathString.substring(0, colon);
+            start = colon + 1;
+        }
+
+        // parse uri authority, if any
+        if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
+                                                                                       // authority
+            int nextSlash = pathString.indexOf('/', start + 2);
+            int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
+            authority = pathString.substring(start + 2, authEnd);
+            start = authEnd;
+        }
+
+        // uri path is the rest of the string -- query & fragment not supported
+        String path = pathString.substring(start, pathString.length());
+
+        initialize(scheme, authority, path, null);
+    }
+
+    /** Construct a Path from components. */
+    public Path(String scheme, String authority, String path) {
+        checkPathArg(path);
+        initialize(scheme, authority, path, null);
+    }
+
+    /**
+     * Construct a path from a URI
+     */
+    public Path(URI aUri) {
+        uri = aUri;
+    }
+
+    private void initialize(String scheme, String authority, String path, String fragment) {
+        try {
+            this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    private String normalizePath(String path) {
+        // remove double slashes & backslashes
+        if (path.indexOf("//") != -1) {
+            path = path.replace("//", "/");
+        }
+        if (path.indexOf("\\") != -1) {
+            path = path.replace("\\", "/");
+        }
+
+        // trim trailing slash from non-root path (ignoring windows drive)
+        int minLength = hasWindowsDrive(path, true) ? 4 : 1;
+        if (path.length() > minLength && path.endsWith("/")) {
+            path = path.substring(0, path.length() - 1);
+        }
+
+        return path;
+    }
+
+    private boolean hasWindowsDrive(String path, boolean slashed) {
+        if (!WINDOWS)
+            return false;
+        int start = slashed ? 1 : 0;
+        return path.length() >= start + 2
+                && (slashed ? path.charAt(0) == '/' : true)
+                && path.charAt(start + 1) == ':'
+                && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
+                        .charAt(start) <= 'z'));
+    }
+
+    /** Convert this to a URI. */
+    public URI toUri() {
+        return uri;
+    }
+
+    /** Return the FileSystem that owns this Path. */
+    public FileSystem getFileSystem(Configuration conf) throws IOException {
+        return FileSystem.get(this.toUri(), conf);
+    }
+
+    /** True if the directory of this path is absolute. */
+    public boolean isAbsolute() {
+        int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+        return uri.getPath().startsWith(SEPARATOR, start);
+    }
+
+    /** Returns the final component of this path. */
+    public String getName() {
+        String path = uri.getPath();
+        int slash = path.lastIndexOf(SEPARATOR);
+        return path.substring(slash + 1);
+    }
+
+    /** Returns the parent of a path or null if at root. */
+    public Path getParent() {
+        String path = uri.getPath();
+        int lastSlash = path.lastIndexOf('/');
+        int start = hasWindowsDrive(path, true) ? 3 : 0;
+        if ((path.length() == start) || // empty path
+                (lastSlash == start && path.length() == start + 1)) { // at root
+            return null;
+        }
+        String parent;
+        if (lastSlash == -1) {
+            parent = CUR_DIR;
+        } else {
+            int end = hasWindowsDrive(path, true) ? 3 : 0;
+            parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
+        }
+        return new Path(uri.getScheme(), uri.getAuthority(), parent);
+    }
+
+    /** Adds a suffix to the final name in the path. */
+    public Path suffix(String suffix) {
+        return new Path(getParent(), getName() + suffix);
+    }
+
+    public String toString() {
+        // we can't use uri.toString(), which escapes everything, because we
+        // want
+        // illegal characters unescaped in the string, for glob processing, etc.
+        StringBuffer buffer = new StringBuffer();
+        if (uri.getScheme() != null) {
+            buffer.append(uri.getScheme());
+            buffer.append(":");
+        }
+        if (uri.getAuthority() != null) {
+            buffer.append("//");
+            buffer.append(uri.getAuthority());
+        }
+        if (uri.getPath() != null) {
+            String path = uri.getPath();
+            if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
+                                                                         // windows
+                                                                         // drive
+                    uri.getScheme() == null && // but no scheme
+                    uri.getAuthority() == null) // or authority
+                path = path.substring(1); // remove slash before drive
+            buffer.append(path);
+        }
+        if (uri.getFragment() != null) {
+            buffer.append("#");
+            buffer.append(uri.getFragment());
+        }
+        return buffer.toString();
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof Path)) {
+            return false;
+        }
+        Path that = (Path) o;
+        return this.uri.equals(that.uri);
+    }
+
+    public int hashCode() {
+        return uri.hashCode();
+    }
+
+    public int compareTo(Object o) {
+        Path that = (Path) o;
+        return this.uri.compareTo(that.uri);
+    }
+
+    /** Return the number of elements in this path. */
+    public int depth() {
+        String path = uri.getPath();
+        int depth = 0;
+        int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
+        while (slash != -1) {
+            depth++;
+            slash = path.indexOf(SEPARATOR, slash + 1);
+        }
+        return depth;
+    }
+
+    /** Returns a qualified path object. */
+    public Path makeQualified(FileSystem fs) {
+        Path path = this;
+        if (!isAbsolute()) {
+            path = new Path(fs.getWorkingDirectory(), this);
+        }
+
+        URI pathUri = path.toUri();
+        URI fsUri = fs.getUri();
+
+        String scheme = pathUri.getScheme();
+        String authority = pathUri.getAuthority();
+        String fragment = pathUri.getFragment();
+        if (scheme != null && (authority != null || fsUri.getAuthority() == null))
+            return path;
+
+        if (scheme == null) {
+            scheme = fsUri.getScheme();
+        }
+
+        if (authority == null) {
+            authority = fsUri.getAuthority();
+            if (authority == null) {
+                authority = "";
+            }
+        }
+
+        URI newUri = null;
+        try {
+            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return new Path(newUri);
+    }
+
+    /** Returns a qualified path object. */
+    public Path makeQualified(URI defaultUri, Path workingDir) {
+        Path path = this;
+        if (!isAbsolute()) {
+            path = new Path(workingDir, this);
+        }
+
+        URI pathUri = path.toUri();
+
+        String scheme = pathUri.getScheme();
+        String authority = pathUri.getAuthority();
+        String fragment = pathUri.getFragment();
+
+        if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
+            return path;
+
+        if (scheme == null) {
+            scheme = defaultUri.getScheme();
+        }
+
+        if (authority == null) {
+            authority = defaultUri.getAuthority();
+            if (authority == null) {
+                authority = "";
+            }
+        }
+
+        URI newUri = null;
+        try {
+            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return new Path(newUri);
+    }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
new file mode 100644
index 0000000..6643ab5
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * <code>InputSplit</code> represents the data to be processed by an individual
+ * {@link Mapper}.
+ * 
+ * <p>
+ * Typically, it presents a byte-oriented view on the input and is the
+ * responsibility of {@link RecordReader} of the job to process this and present
+ * a record-oriented view.
+ * 
+ * @see InputFormat
+ * @see RecordReader
+ */
+public abstract class InputSplit implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Get the size of the split, so that the input splits can be sorted by
+     * size.
+     * 
+     * @return the number of bytes in the split
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract long getLength() throws IOException, InterruptedException;
+
+    /**
+     * Get the list of nodes by name where the data for the split would be
+     * local. The locations do not need to be serialized.
+     * 
+     * @return a new array of the node nodes.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract String[] getLocations() throws IOException, InterruptedException;
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/conf/cluster.properties b/pregelix-core/src/main/resources/conf/cluster.properties
new file mode 100644
index 0000000..74a98b0
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=~/workspace/hyracks_asterix_stabilization
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/pregelix-core/src/main/resources/conf/master b/pregelix-core/src/main/resources/conf/master
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/slaves b/pregelix-core/src/main/resources/conf/slaves
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/stores.properties b/pregelix-core/src/main/resources/conf/stores.properties
new file mode 100644
index 0000000..d1a4e10
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/stores.properties
@@ -0,0 +1,2 @@
+#Comma separated directories for storing the partitioned graph on each machine
+store=/tmp/teststore1,/tmp/teststore2
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/hyracks-deployment.properties b/pregelix-core/src/main/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..8c84699
--- /dev/null
+++ b/pregelix-core/src/main/resources/hyracks-deployment.properties
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------------
+# /*
+#  * Copyright 2009-2010 by The Regents of the University of California
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * you may obtain a copy of the License from
+#  * 
+#  *     http://www.apache.org/licenses/LICENSE-2.0
+#  * 
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+#-------------------------------------------------------------------------------
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
diff --git a/pregelix-core/src/main/resources/scripts/pregelix b/pregelix-core/src/main/resources/scripts/pregelix
new file mode 100644
index 0000000..275175d
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix
@@ -0,0 +1,115 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+#  Copyright 2001-2006 The Apache Software Foundation.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ----------------------------------------------------------------------------
+#
+#   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights
+#   reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  Darwin*) darwin=true
+           if [ -z "$JAVA_VERSION" ] ; then
+             JAVA_VERSION="CurrentJDK"
+           else
+             echo "Using Java version: $JAVA_VERSION"
+           fi
+           if [ -z "$JAVA_HOME" ] ; then
+             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+           fi
+           ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD=`which java`
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." 1>&2
+  echo "  We cannot execute $JAVACMD" 1>&2
+  exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+  REPO="$BASEDIR"/lib
+fi
+
+cp $BASEDIR"/../a-hadoop-patch.jar "$REPO"/
+
+CLASSPATH=$CLASSPATH_PREFIX:"$BASEDIR"/etc:"$REPO"/a-hadoop-patch.jar:"$REPO"/pregelix-api-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-api-0.2.2-SNAPSHOT.jar:"$REPO"/json-20090211.jar:"$REPO"/httpclient-4.1-alpha2.jar:"$REPO"/httpcore-4.1-beta1.jar:"$REPO"/commons-logging-1.1.1.jar:"$REPO"/commons-codec-1.3.jar:"$REPO"/args4j-2.0.12.jar:"$REPO"/hyracks-ipc-0.2.2-SNAPSHOT.jar:"$REPO"/commons-lang3-3.1.jar:"$REPO"/hyracks-data-std-0.2.2-SNAPSHOT.jar:"$REPO"/hadoop-core-0.20.2.jar:"$REPO"/commons-cli-1.2.jar:"$REPO"/xmlenc-0.52.jar:"$REPO"/commons-httpclient-3.0.1.jar:"$REPO"/commons-net-1.4.1.jar:"$REPO"/oro-2.0.8.jar:"$REPO"/jetty-6.1.14.jar:"$REPO"/jetty-util-6.1.14.jar:"$REPO"/servlet-api-2.5-6.1.14.jar:"$REPO"/jasper-runtime-5.5.12.jar:"$REPO"/jasper-compiler-5.5.12.jar:"$REPO"/jsp-api-2.1-6.1.14.jar:"$REPO"/jsp-2.1-6.1.14.jar:"$REPO"/core-3.1.1.jar:"$REPO"/ant-1.6.5.jar:"$REPO"/commons-el-1.0.jar:"$REPO"/jets3t-0.7.1.jar:"$REPO"/kfs-0.3.jar:"$REPO"/hsqldb-1.8.0.10.jar:"$REPO"/pregelix-dataflow-std-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-std-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar:"$REPO"/dcache-client-0.0.1.jar:"$REPO"/jetty-client-8.0.0.M0.jar:"$REPO"/jetty-http-8.0.0.RC0.jar:"$REPO"/jetty-io-8.0.0.RC0.jar:"$REPO"/jetty-util-8.0.0.RC0.jar:"$REPO"/hyracks-storage-am-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar:"$REPO"/btreehelper-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-cc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-common-0.2.2-SNAPSHOT.jar:"$REPO"/commons-io-1.3.1.jar:"$REPO"/jetty-server-8.0.0.RC0.jar:"$REPO"/servlet-api-3.0.20100224.jar:"$REPO"/jetty-continuation-8.0.0.RC0.jar:"$REPO"/jetty-webapp-8.0.0.RC0.jar:"$REPO"/jetty-xml-8.0.0.RC0.jar:"$REPO"/jetty-servlet-8.0.0.RC0.jar:"$REPO"/jetty-security-8.0.0.RC0.jar:"$REPO"/wicket-core-1.5.2.jar:"$REPO"/wicket-util-1.5.2.jar:"$REPO"/slf4j-api-1.6.1.jar:"$REPO"/wicket-request-1.5.2.jar:"$REPO"/slf4j-jcl-1.6.3.jar:"$REPO"/hyracks-control-nc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-net-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-runtime-0.0.1-SNAPSHOT.jar:"$REPO"/hadoop-test-0.20.2.jar:"$REPO"/ftplet-api-1.0.0.jar:"$REPO"/mina-core-2.0.0-M5.jar:"$REPO"/ftpserver-core-1.0.0.jar:"$REPO"/ftpserver-deprecated-1.0.0-M2.jar:"$REPO"/javax.servlet-api-3.0.1.jar:"$REPO"/pregelix-core-0.0.1-SNAPSHOT.jar
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS  \
+  -classpath "$CLASSPATH" \
+  -Dapp.name="pregelix" \
+  -Dapp.pid="$$" \
+  -Dapp.repo="$REPO" \
+  -Dapp.home="$BASEDIR" \
+  -Dbasedir="$BASEDIR" \
+  org.apache.hadoop.util.RunJar \
+  "$@"
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/pregelix.bat b/pregelix-core/src/main/resources/scripts/pregelix.bat
new file mode 100644
index 0000000..536e3c8
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix.bat
@@ -0,0 +1,110 @@
+@REM ----------------------------------------------------------------------------

+@REM  Copyright 2001-2006 The Apache Software Foundation.

+@REM

+@REM  Licensed under the Apache License, Version 2.0 (the "License");

+@REM  you may not use this file except in compliance with the License.

+@REM  You may obtain a copy of the License at

+@REM

+@REM       http://www.apache.org/licenses/LICENSE-2.0

+@REM

+@REM  Unless required by applicable law or agreed to in writing, software

+@REM  distributed under the License is distributed on an "AS IS" BASIS,

+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+@REM  See the License for the specific language governing permissions and

+@REM  limitations under the License.

+@REM ----------------------------------------------------------------------------

+@REM

+@REM   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights

+@REM   reserved.

+

+@echo off

+

+set ERROR_CODE=0

+

+:init

+@REM Decide how to startup depending on the version of windows

+

+@REM -- Win98ME

+if NOT "%OS%"=="Windows_NT" goto Win9xArg

+

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" @setlocal

+

+@REM -- 4NT shell

+if "%eval[2+2]" == "4" goto 4NTArgs

+

+@REM -- Regular WinNT shell

+set CMD_LINE_ARGS=%*

+goto WinNTGetScriptDir

+

+@REM The 4NT Shell from jp software

+:4NTArgs

+set CMD_LINE_ARGS=%$

+goto WinNTGetScriptDir

+

+:Win9xArg

+@REM Slurp the command line arguments.  This loop allows for an unlimited number

+@REM of arguments (up to the command line limit, anyway).

+set CMD_LINE_ARGS=

+:Win9xApp

+if %1a==a goto Win9xGetScriptDir

+set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1

+shift

+goto Win9xApp

+

+:Win9xGetScriptDir

+set SAVEDIR=%CD%

+%0\

+cd %0\..\.. 

+set BASEDIR=%CD%

+cd %SAVEDIR%

+set SAVE_DIR=

+goto repoSetup

+

+:WinNTGetScriptDir

+set BASEDIR=%~dp0\..

+

+:repoSetup

+

+

+if "%JAVACMD%"=="" set JAVACMD=java

+

+if "%REPO%"=="" set REPO=%BASEDIR%\lib

+

+cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\

+

+set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar

+goto endInit

+

+@REM Reaching here means variables are defined and arguments have been captured

+:endInit

+

+%JAVACMD% %JAVA_OPTS%  -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%

+if ERRORLEVEL 1 goto error

+goto end

+

+:error

+if "%OS%"=="Windows_NT" @endlocal

+set ERROR_CODE=%ERRORLEVEL%

+

+:end

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" goto endNT

+

+@REM For old DOS remove the set variables from ENV - we assume they were not set

+@REM before we started - at least we don't leave any baggage around

+set CMD_LINE_ARGS=

+goto postExec

+

+:endNT

+@REM If error code is set to 1 then the endlocal was done already in :error.

+if %ERROR_CODE% EQU 0 @endlocal

+

+

+:postExec

+

+if "%FORCE_EXIT_ON_ERROR%" == "on" (

+  if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%

+)

+

+exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/startAllNCs.sh b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
new file mode 100644
index 0000000..64b7fcf
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${PREGELIX_PATH}; echo `pwd`; bin/startnc.sh"
+done
diff --git a/pregelix-core/src/main/resources/scripts/startCluster.sh b/pregelix-core/src/main/resources/scripts/startCluster.sh
new file mode 100644
index 0000000..bc1e7c4
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startCluster.sh
@@ -0,0 +1,3 @@
+bin/startcc.sh
+sleep 10
+bin/startAllNCs.sh
diff --git a/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix-core/src/main/resources/scripts/startcc.sh
new file mode 100644
index 0000000..aac9e60
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Remove the temp dir
+rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+echo $JAVA_HOME
+echo $JAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix-core/src/main/resources/scripts/startnc.sh
new file mode 100644
index 0000000..1da6248
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -0,0 +1,47 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get IP Address
+IPADDR=`/sbin/ifconfig eth0 | grep "inet addr" | awk '{print $2}' | cut -f 2 -d ':'`
+#HOSTNAME=`hostname`
+#echo $HOSTNAME
+#IPADDR=`dig +short ${HOSTNAME}`
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+chmod -R 755 $HYRACKS_HOME
+echo $HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" -frame-size $FRAME_SIZE
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &