Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 0c2416d..d3bcaca 100644
--- a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -12,598 +12,601 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hivesterix.runtime.exec;

-

-import java.io.BufferedReader;

-import java.io.FileInputStream;

-import java.io.InputStream;

-import java.io.InputStreamReader;

-import java.io.PrintWriter;

-import java.io.Serializable;

-import java.net.InetAddress;

-import java.util.ArrayList;

-import java.util.HashMap;

-import java.util.Iterator;

-import java.util.List;

-import java.util.Map;

-import java.util.Map.Entry;

-import java.util.Properties;

-import java.util.Set;

-

-import org.apache.commons.logging.Log;

-import org.apache.commons.logging.LogFactory;

-import org.apache.hadoop.hive.conf.HiveConf;

-import org.apache.hadoop.hive.ql.exec.ConditionalTask;

-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;

-import org.apache.hadoop.hive.ql.exec.MapRedTask;

-import org.apache.hadoop.hive.ql.exec.Operator;

-import org.apache.hadoop.hive.ql.exec.TableScanOperator;

-import org.apache.hadoop.hive.ql.exec.Task;

-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;

-import org.apache.hadoop.hive.ql.plan.FetchWork;

-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;

-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;

-import org.apache.hadoop.hive.ql.plan.MapredWork;

-import org.apache.hadoop.hive.ql.plan.PartitionDesc;

-import org.apache.hadoop.hive.ql.plan.TableScanDesc;

-

-import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;

-import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;

-import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;

-import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;

-import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;

-import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;

-import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;

-import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;

-import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;

-import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;

-import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;

-import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;

-import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;

-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;

-import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;

-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;

-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

-import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;

-import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;

-import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;

-import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;

-import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;

-import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;

-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;

-import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;

-import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;

-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;

-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;

-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

-import edu.uci.ics.hyracks.api.job.JobId;

-import edu.uci.ics.hyracks.api.job.JobSpecification;

-

-@SuppressWarnings({ "rawtypes", "unchecked" })

-public class HyracksExecutionEngine implements IExecutionEngine {

-

-    private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

-    private static final String clusterPropertiesPath = "conf/cluster.properties";

-    private static final String masterFilePath = "conf/master";

-

-    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-    static {

-        SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);

-        SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);

-        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

-                HiveRuleCollections.NORMALIZATION));

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

-                HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

-                HiveRuleCollections.LOAD_FIELDS));

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

-                HiveRuleCollections.OP_PUSHDOWN));

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

-                HiveRuleCollections.DATA_EXCHANGE));

-        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

-                HiveRuleCollections.CONSOLIDATION));

-

-        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

-                HiveRuleCollections.PHYSICAL_PLAN_REWRITES));

-        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

-                HiveRuleCollections.prepareJobGenRules));

-    }

-

-    /**

-     * static configurations for compiler

-     */

-    private HeuristicCompilerFactoryBuilder builder;

-

-    /**

-     * compiler

-     */

-    private ICompiler compiler;

-

-    /**

-     * physical optimization config

-     */

-    private PhysicalOptimizationConfig physicalOptimizationConfig;

-

-    /**

-     * final ending operators

-     */

-    private List<Operator> leaveOps = new ArrayList<Operator>();

-

-    /**

-     * tasks that are already visited

-     */

-    private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();

-

-    /**

-     * hyracks job spec

-     */

-    private JobSpecification jobSpec;

-

-    /**

-     * hive configuration

-     */

-    private HiveConf conf;

-

-    /**

-     * plan printer

-     */

-    private PrintWriter planPrinter;

-

-    /**

-     * properties

-     */

-    private Properties clusterProps;

-

-    /**

-     * the Hyracks client connection

-     */

-    private IHyracksClientConnection hcc;

-

-    public HyracksExecutionEngine(HiveConf conf) {

-        this.conf = conf;

-        init(conf);

-    }

-

-    public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {

-        this.conf = conf;

-        this.planPrinter = planPrinter;

-        init(conf);

-    }

-

-    private void init(HiveConf conf) {

-        builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);

-        builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);

-        builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);

-        builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);

-        builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);

-        builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);

-

-        long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);

-        long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);

-        int frameSize = conf.getInt("hive.algebricks.framesize", 32768);

-

-        physicalOptimizationConfig = new PhysicalOptimizationConfig();

-        int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);

-        physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);

-        int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);

-        physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);

-        builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);

-    }

-

-    @Override

-    public int compileJob(List<Task<? extends Serializable>> rootTasks) {

-        // clean up

-        leaveOps.clear();

-        tasksVisited.clear();

-        jobSpec = null;

-

-        HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();

-        List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);

-

-        // get all leave Ops

-        getLeaves(rootOps, leaveOps);

-

-        HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();

-        try {

-            translator.translate(rootOps, null, aliasToPath);

-

-            ILogicalPlan plan = translator.genLogicalPlan();

-

-            if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {

-                translator.printOperators();

-                ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,

-                        translator.getMetadataProvider());

-

-                ICompilerFactory compilerFactory = builder.create();

-                compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),

-                        planAndMetadata.getMetadataProvider(), translator.getVariableCounter());

-

-                // run optimization and re-writing rules for Hive plan

-                compiler.optimize();

-

-                // print optimized plan

-                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

-                StringBuilder buffer = new StringBuilder();

-                PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

-                String planStr = buffer.toString();

-                System.out.println(planStr);

-

-                if (planPrinter != null)

-                    planPrinter.print(planStr);

-            }

-        } catch (Exception e) {

-            e.printStackTrace();

-            return 1;

-        }

-

-        return 0;

-    }

-

-    private void codeGen() throws AlgebricksException {

-        try {

-            // number of cpu cores in the cluster

-            builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));

-        } catch (Exception e) {

-            throw new AlgebricksException(e);

-        }

-        // builder.setClusterTopology(ConfUtil.getClusterTopology());

-        builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);

-        builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);

-        builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);

-        builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);

-        builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);

-        builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);

-        builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);

-        builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);

-        builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);

-        builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);

-        builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);

-        builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);

-

-        jobSpec = compiler.createJob(null, null);

-

-        // set the policy

-        String policyStr = conf.get("hive.hyracks.connectorpolicy");

-        if (policyStr == null)

-            policyStr = "PIPELINING";

-        Policy policyValue = Policy.valueOf(policyStr);

-        jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));

-        jobSpec.setUseConnectorPolicyForScheduling(false);

-    }

-

-    @Override

-    public int executeJob() {

-        try {

-            codeGen();

-            executeHyracksJob(jobSpec);

-        } catch (Exception e) {

-            e.printStackTrace();

-            return 1;

-        }

-        return 0;

-    }

-

-    private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,

-            HashMap<String, PartitionDesc> aliasToPath) {

-

-        List<Operator> rootOps = new ArrayList<Operator>();

-        List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();

-        tasksVisited.clear();

-

-        for (int i = rootTasks.size() - 1; i >= 0; i--) {

-            /**

-             * list of map-reduce tasks

-             */

-            Task<? extends Serializable> task = rootTasks.get(i);

-

-            if (task instanceof MapRedTask) {

-                List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);

-                if (i == 0)

-                    rootOps.addAll(mapRootOps);

-                else {

-                    List<Operator> leaves = new ArrayList<Operator>();

-                    getLeaves(rootOps, leaves);

-

-                    List<Operator> mapChildren = new ArrayList<Operator>();

-                    for (Operator childMap : mapRootOps) {

-                        if (childMap instanceof TableScanOperator) {

-                            TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

-                            if (topDesc == null)

-                                mapChildren.add(childMap);

-                            else {

-                                rootOps.add(childMap);

-                            }

-                        } else

-                            mapChildren.add(childMap);

-                    }

-

-                    if (mapChildren.size() > 0) {

-                        for (Operator leaf : leaves)

-                            leaf.setChildOperators(mapChildren);

-                        for (Operator child : mapChildren)

-                            child.setParentOperators(leaves);

-                    }

-                }

-

-                MapredWork mr = (MapredWork) task.getWork();

-                HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

-

-                addAliasToPartition(aliasToPath, map);

-                toDelete.add(task);

-            }

-        }

-

-        for (Task<? extends Serializable> task : toDelete)

-            rootTasks.remove(task);

-

-        return rootOps;

-    }

-

-    private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {

-        Iterator<String> keys = map.keySet().iterator();

-        while (keys.hasNext()) {

-            String key = keys.next();

-            PartitionDesc part = map.get(key);

-            String[] names = key.split(":");

-            for (String name : names) {

-                aliasToPath.put(name, part);

-            }

-        }

-    }

-

-    private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,

-            HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {

-        // System.out.println("!"+task.getName());

-        if (!(task instanceof MapRedTask)) {

-            if (!(task instanceof ConditionalTask)) {

-                rootTasks.add(task);

-                return null;

-            } else {

-                // remove map-reduce branches in condition task

-                ConditionalTask condition = (ConditionalTask) task;

-                List<Task<? extends Serializable>> branches = condition.getListTasks();

-                for (int i = branches.size() - 1; i >= 0; i--) {

-                    Task branch = branches.get(i);

-                    if (branch instanceof MapRedTask) {

-                        return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);

-                    }

-                }

-                rootTasks.add(task);

-                return null;

-            }

-        }

-

-        MapredWork mr = (MapredWork) task.getWork();

-        HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

-

-        // put all aliasToParitionDesc mapping into the map

-        addAliasToPartition(aliasToPath, map);

-

-        MapRedTask mrtask = (MapRedTask) task;

-        MapredWork work = (MapredWork) mrtask.getWork();

-        HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();

-

-        Set entries = operators.entrySet();

-        Iterator<Entry<String, Operator>> iterator = entries.iterator();

-        List<Operator> mapRootOps = new ArrayList<Operator>();

-

-        // get map root operators

-        while (iterator.hasNext()) {

-            Operator next = iterator.next().getValue();

-            if (!mapRootOps.contains(next)) {

-                // clear that only for the case of union

-                mapRootOps.add(next);

-            }

-        }

-

-        // get map local work

-        MapredLocalWork localWork = work.getMapLocalWork();

-        if (localWork != null) {

-            HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();

-

-            Set localEntries = localOperators.entrySet();

-            Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();

-            while (localIterator.hasNext()) {

-                mapRootOps.add(localIterator.next().getValue());

-            }

-

-            HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();

-            Set localFetchEntries = localFetch.entrySet();

-            Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();

-            while (localFetchIterator.hasNext()) {

-                Entry<String, FetchWork> fetchMap = localFetchIterator.next();

-                FetchWork fetch = fetchMap.getValue();

-                String alias = fetchMap.getKey();

-                List<PartitionDesc> dirPart = fetch.getPartDesc();

-

-                // temporary hack: put the first partitionDesc into the map

-                aliasToPath.put(alias, dirPart.get(0));

-            }

-        }

-

-        Boolean visited = tasksVisited.get(task);

-        if (visited != null && visited.booleanValue() == true) {

-            return mapRootOps;

-        }

-

-        // do that only for union operator

-        for (Operator op : mapRootOps)

-            if (op.getParentOperators() != null)

-                op.getParentOperators().clear();

-

-        List<Operator> mapLeaves = new ArrayList<Operator>();

-        downToLeaves(mapRootOps, mapLeaves);

-        List<Operator> reduceOps = new ArrayList<Operator>();

-

-        if (work.getReducer() != null)

-            reduceOps.add(work.getReducer());

-

-        for (Operator mapLeaf : mapLeaves) {

-            mapLeaf.setChildOperators(reduceOps);

-        }

-

-        for (Operator reduceOp : reduceOps) {

-            if (reduceOp != null)

-                reduceOp.setParentOperators(mapLeaves);

-        }

-

-        List<Operator> leafs = new ArrayList<Operator>();

-        if (reduceOps.size() > 0) {

-            downToLeaves(reduceOps, leafs);

-        } else {

-            leafs = mapLeaves;

-        }

-

-        List<Operator> mapChildren = new ArrayList<Operator>();

-        if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {

-            for (Object child : task.getChildTasks()) {

-                List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);

-                if (childMapOps == null)

-                    continue;

-

-                for (Operator childMap : childMapOps) {

-                    if (childMap instanceof TableScanOperator) {

-                        TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

-                        if (topDesc == null)

-                            mapChildren.add(childMap);

-                        else {

-                            rootOps.add(childMap);

-                        }

-                    } else {

-                        // if not table scan, add the child

-                        mapChildren.add(childMap);

-                    }

-                }

-            }

-

-            if (mapChildren.size() > 0) {

-                int i = 0;

-                for (Operator leaf : leafs) {

-                    if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)

-                        leaf.setChildOperators(new ArrayList<Operator>());

-                    leaf.getChildOperators().add(mapChildren.get(i));

-                    i++;

-                }

-                i = 0;

-                for (Operator child : mapChildren) {

-                    if (child.getParentOperators() == null || child.getParentOperators().size() == 0)

-                        child.setParentOperators(new ArrayList<Operator>());

-                    child.getParentOperators().add(leafs.get(i));

-                    i++;

-                }

-            }

-        }

-

-        // mark this task as visited

-        this.tasksVisited.put(task, true);

-        return mapRootOps;

-    }

-

-    /**

-     * down to leaf nodes

-     * 

-     * @param ops

-     * @param leaves

-     */

-    private void downToLeaves(List<Operator> ops, List<Operator> leaves) {

-

-        // Operator currentOp;

-        for (Operator op : ops) {

-            if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {

-                downToLeaves(op.getChildOperators(), leaves);

-            } else {

-                if (op != null && leaves.indexOf(op) < 0)

-                    leaves.add(op);

-            }

-        }

-    }

-

-    private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {

-        for (Operator op : roots) {

-            List<Operator> children = op.getChildOperators();

-            if (children == null || children.size() <= 0) {

-                currentLeaves.add(op);

-            } else {

-                getLeaves(children, currentLeaves);

-            }

-        }

-    }

-

-    private void executeHyracksJob(JobSpecification job) throws Exception {

-

-        /**

-         * load the properties file if it is not loaded

-         */

-        if (clusterProps == null) {

-            clusterProps = new Properties();

-            InputStream confIn = new FileInputStream(clusterPropertiesPath);

-            clusterProps.load(confIn);

-            confIn.close();

-        }

-

-        if (hcc == null) {

-            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

-            String masterNode = ipReader.readLine();

-            ipReader.close();

-

-            InetAddress[] ips = InetAddress.getAllByName(masterNode);

-            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-            for (InetAddress ip : ips) {

-                if (ip.getAddress().length <= 4) {

-                    try {

-                        hcc = new HyracksConnection(ip.getHostAddress(), port);

-                        break;

-                    } catch (Exception e) {

-                        continue;

-                    }

-                }

-            }

-        }

-

-        long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob(job);

-        hcc.waitForCompletion(jobId);

-

-        // System.out.println("job finished: " + jobId.toString());

-        // call all leave nodes to end

-        for (Operator leaf : leaveOps) {

-            jobClose(leaf);

-        }

-

-        long end = System.currentTimeMillis();

-        System.err.println(start + " " + end + " " + (end - start));

-    }

-

-    /**

-     * mv to final directory on hdfs (not real final)

-     * 

-     * @param leaf

-     * @throws Exception

-     */

-    private void jobClose(Operator leaf) throws Exception {

-        FileSinkOperator fsOp = (FileSinkOperator) leaf;

-        FileSinkDesc desc = fsOp.getConf();

-        boolean isNativeTable = !desc.getTableInfo().isNonNative();

-        if ((conf != null) && isNativeTable) {

-            String specPath = desc.getDirName();

-            DynamicPartitionCtx dpCtx = desc.getDynPartCtx();

-            // for 0.7.0

-            fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);

-            // for 0.8.0

-            // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,

-            // desc);

-        }

-    }

-}

+package edu.uci.ics.hivesterix.runtime.exec;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;
+import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;
+import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;
+import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;
+import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;
+import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;
+import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;
+import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;
+import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;
+import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;
+import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
+import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HyracksExecutionEngine implements IExecutionEngine {
+
+    private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
+    private static final String clusterPropertiesPath = "conf/cluster.properties";
+    private static final String masterFilePath = "conf/master";
+
+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+    static {
+        SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
+        SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
+        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+                HiveRuleCollections.NORMALIZATION));
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+                HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+                HiveRuleCollections.LOAD_FIELDS));
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+                HiveRuleCollections.OP_PUSHDOWN));
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+                HiveRuleCollections.DATA_EXCHANGE));
+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+                HiveRuleCollections.CONSOLIDATION));
+
+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+                HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+                HiveRuleCollections.prepareJobGenRules));
+    }
+
+    /**
+     * static configurations for compiler
+     */
+    private HeuristicCompilerFactoryBuilder builder;
+
+    /**
+     * compiler
+     */
+    private ICompiler compiler;
+
+    /**
+     * physical optimization config
+     */
+    private PhysicalOptimizationConfig physicalOptimizationConfig;
+
+    /**
+     * final ending operators
+     */
+    private List<Operator> leaveOps = new ArrayList<Operator>();
+
+    /**
+     * tasks that are already visited
+     */
+    private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
+
+    /**
+     * hyracks job spec
+     */
+    private JobSpecification jobSpec;
+
+    /**
+     * hive configuration
+     */
+    private HiveConf conf;
+
+    /**
+     * plan printer
+     */
+    private PrintWriter planPrinter;
+
+    /**
+     * properties
+     */
+    private Properties clusterProps;
+
+    /**
+     * the Hyracks client connection
+     */
+    private IHyracksClientConnection hcc;
+
+    public HyracksExecutionEngine(HiveConf conf) {
+        this.conf = conf;
+        init(conf);
+    }
+
+    public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
+        this.conf = conf;
+        this.planPrinter = planPrinter;
+        init(conf);
+    }
+
+    private void init(HiveConf conf) {
+        builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
+        builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
+        builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
+        builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
+        builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
+        builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
+
+        long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
+        long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
+        int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
+
+        physicalOptimizationConfig = new PhysicalOptimizationConfig();
+        int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
+        physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
+        int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
+        physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
+        builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
+    }
+
+    @Override
+    public int compileJob(List<Task<? extends Serializable>> rootTasks) {
+        // clean up
+        leaveOps.clear();
+        tasksVisited.clear();
+        jobSpec = null;
+
+        HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
+        List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
+
+        // get all leave Ops
+        getLeaves(rootOps, leaveOps);
+
+        HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
+        try {
+            translator.translate(rootOps, null, aliasToPath);
+
+            ILogicalPlan plan = translator.genLogicalPlan();
+
+            if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
+                translator.printOperators();
+                ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
+                        translator.getMetadataProvider());
+
+                ICompilerFactory compilerFactory = builder.create();
+                compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
+                        planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
+
+                // run optimization and re-writing rules for Hive plan
+                compiler.optimize();
+
+                // print optimized plan
+                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+                StringBuilder buffer = new StringBuilder();
+                PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+                String planStr = buffer.toString();
+                System.out.println(planStr);
+
+                if (planPrinter != null)
+                    planPrinter.print(planStr);
+            } else {
+                /** it is not a map reduce task DAG */
+                return 2;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return 1;
+        }
+
+        return 0;
+    }
+
+    private void codeGen() throws AlgebricksException {
+        try {
+            // number of cpu cores in the cluster
+            builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        // builder.setClusterTopology(ConfUtil.getClusterTopology());
+        builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
+        builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
+        builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
+        builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
+        builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
+        builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
+        builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
+        builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
+        builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
+        builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
+        builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
+        builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
+
+        jobSpec = compiler.createJob(null, null);
+
+        // set the policy
+        String policyStr = conf.get("hive.hyracks.connectorpolicy");
+        if (policyStr == null)
+            policyStr = "PIPELINING";
+        Policy policyValue = Policy.valueOf(policyStr);
+        jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
+        jobSpec.setUseConnectorPolicyForScheduling(false);
+    }
+
+    @Override
+    public int executeJob() {
+        try {
+            codeGen();
+            executeHyracksJob(jobSpec);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return 1;
+        }
+        return 0;
+    }
+
+    private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
+            HashMap<String, PartitionDesc> aliasToPath) {
+
+        List<Operator> rootOps = new ArrayList<Operator>();
+        List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
+        tasksVisited.clear();
+
+        for (int i = rootTasks.size() - 1; i >= 0; i--) {
+            /**
+             * list of map-reduce tasks
+             */
+            Task<? extends Serializable> task = rootTasks.get(i);
+
+            if (task instanceof MapRedTask) {
+                List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
+                if (i == 0)
+                    rootOps.addAll(mapRootOps);
+                else {
+                    List<Operator> leaves = new ArrayList<Operator>();
+                    getLeaves(rootOps, leaves);
+
+                    List<Operator> mapChildren = new ArrayList<Operator>();
+                    for (Operator childMap : mapRootOps) {
+                        if (childMap instanceof TableScanOperator) {
+                            TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+                            if (topDesc == null)
+                                mapChildren.add(childMap);
+                            else {
+                                rootOps.add(childMap);
+                            }
+                        } else
+                            mapChildren.add(childMap);
+                    }
+
+                    if (mapChildren.size() > 0) {
+                        for (Operator leaf : leaves)
+                            leaf.setChildOperators(mapChildren);
+                        for (Operator child : mapChildren)
+                            child.setParentOperators(leaves);
+                    }
+                }
+
+                MapredWork mr = (MapredWork) task.getWork();
+                HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+
+                addAliasToPartition(aliasToPath, map);
+                toDelete.add(task);
+            }
+        }
+
+        for (Task<? extends Serializable> task : toDelete)
+            rootTasks.remove(task);
+
+        return rootOps;
+    }
+
+    private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
+        Iterator<String> keys = map.keySet().iterator();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            PartitionDesc part = map.get(key);
+            String[] names = key.split(":");
+            for (String name : names) {
+                aliasToPath.put(name, part);
+            }
+        }
+    }
+
+    private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
+            HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
+        // System.out.println("!"+task.getName());
+        if (!(task instanceof MapRedTask)) {
+            if (!(task instanceof ConditionalTask)) {
+                rootTasks.add(task);
+                return null;
+            } else {
+                // remove map-reduce branches in condition task
+                ConditionalTask condition = (ConditionalTask) task;
+                List<Task<? extends Serializable>> branches = condition.getListTasks();
+                for (int i = branches.size() - 1; i >= 0; i--) {
+                    Task branch = branches.get(i);
+                    if (branch instanceof MapRedTask) {
+                        return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
+                    }
+                }
+                rootTasks.add(task);
+                return null;
+            }
+        }
+
+        MapredWork mr = (MapredWork) task.getWork();
+        HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+
+        // put all aliasToParitionDesc mapping into the map
+        addAliasToPartition(aliasToPath, map);
+
+        MapRedTask mrtask = (MapRedTask) task;
+        MapredWork work = (MapredWork) mrtask.getWork();
+        HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
+
+        Set entries = operators.entrySet();
+        Iterator<Entry<String, Operator>> iterator = entries.iterator();
+        List<Operator> mapRootOps = new ArrayList<Operator>();
+
+        // get map root operators
+        while (iterator.hasNext()) {
+            Operator next = iterator.next().getValue();
+            if (!mapRootOps.contains(next)) {
+                // clear that only for the case of union
+                mapRootOps.add(next);
+            }
+        }
+
+        // get map local work
+        MapredLocalWork localWork = work.getMapLocalWork();
+        if (localWork != null) {
+            HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
+
+            Set localEntries = localOperators.entrySet();
+            Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
+            while (localIterator.hasNext()) {
+                mapRootOps.add(localIterator.next().getValue());
+            }
+
+            HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
+            Set localFetchEntries = localFetch.entrySet();
+            Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
+            while (localFetchIterator.hasNext()) {
+                Entry<String, FetchWork> fetchMap = localFetchIterator.next();
+                FetchWork fetch = fetchMap.getValue();
+                String alias = fetchMap.getKey();
+                List<PartitionDesc> dirPart = fetch.getPartDesc();
+
+                // temporary hack: put the first partitionDesc into the map
+                aliasToPath.put(alias, dirPart.get(0));
+            }
+        }
+
+        Boolean visited = tasksVisited.get(task);
+        if (visited != null && visited.booleanValue() == true) {
+            return mapRootOps;
+        }
+
+        // do that only for union operator
+        for (Operator op : mapRootOps)
+            if (op.getParentOperators() != null)
+                op.getParentOperators().clear();
+
+        List<Operator> mapLeaves = new ArrayList<Operator>();
+        downToLeaves(mapRootOps, mapLeaves);
+        List<Operator> reduceOps = new ArrayList<Operator>();
+
+        if (work.getReducer() != null)
+            reduceOps.add(work.getReducer());
+
+        for (Operator mapLeaf : mapLeaves) {
+            mapLeaf.setChildOperators(reduceOps);
+        }
+
+        for (Operator reduceOp : reduceOps) {
+            if (reduceOp != null)
+                reduceOp.setParentOperators(mapLeaves);
+        }
+
+        List<Operator> leafs = new ArrayList<Operator>();
+        if (reduceOps.size() > 0) {
+            downToLeaves(reduceOps, leafs);
+        } else {
+            leafs = mapLeaves;
+        }
+
+        List<Operator> mapChildren = new ArrayList<Operator>();
+        if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
+            for (Object child : task.getChildTasks()) {
+                List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
+                if (childMapOps == null)
+                    continue;
+
+                for (Operator childMap : childMapOps) {
+                    if (childMap instanceof TableScanOperator) {
+                        TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+                        if (topDesc == null)
+                            mapChildren.add(childMap);
+                        else {
+                            rootOps.add(childMap);
+                        }
+                    } else {
+                        // if not table scan, add the child
+                        mapChildren.add(childMap);
+                    }
+                }
+            }
+
+            if (mapChildren.size() > 0) {
+                int i = 0;
+                for (Operator leaf : leafs) {
+                    if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
+                        leaf.setChildOperators(new ArrayList<Operator>());
+                    leaf.getChildOperators().add(mapChildren.get(i));
+                    i++;
+                }
+                i = 0;
+                for (Operator child : mapChildren) {
+                    if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
+                        child.setParentOperators(new ArrayList<Operator>());
+                    child.getParentOperators().add(leafs.get(i));
+                    i++;
+                }
+            }
+        }
+
+        // mark this task as visited
+        this.tasksVisited.put(task, true);
+        return mapRootOps;
+    }
+
+    /**
+     * down to leaf nodes
+     * 
+     * @param ops
+     * @param leaves
+     */
+    private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
+
+        // Operator currentOp;
+        for (Operator op : ops) {
+            if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
+                downToLeaves(op.getChildOperators(), leaves);
+            } else {
+                if (op != null && leaves.indexOf(op) < 0)
+                    leaves.add(op);
+            }
+        }
+    }
+
+    private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
+        for (Operator op : roots) {
+            List<Operator> children = op.getChildOperators();
+            if (children == null || children.size() <= 0) {
+                currentLeaves.add(op);
+            } else {
+                getLeaves(children, currentLeaves);
+            }
+        }
+    }
+
+    private void executeHyracksJob(JobSpecification job) throws Exception {
+
+        /**
+         * load the properties file if it is not loaded
+         */
+        if (clusterProps == null) {
+            clusterProps = new Properties();
+            InputStream confIn = new FileInputStream(clusterPropertiesPath);
+            clusterProps.load(confIn);
+            confIn.close();
+        }
+
+        if (hcc == null) {
+            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
+            String masterNode = ipReader.readLine();
+            ipReader.close();
+
+            InetAddress[] ips = InetAddress.getAllByName(masterNode);
+            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
+            for (InetAddress ip : ips) {
+                if (ip.getAddress().length <= 4) {
+                    try {
+                        hcc = new HyracksConnection(ip.getHostAddress(), port);
+                        break;
+                    } catch (Exception e) {
+                        continue;
+                    }
+                }
+            }
+        }
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(job);
+        hcc.waitForCompletion(jobId);
+
+        // System.out.println("job finished: " + jobId.toString());
+        // call all leave nodes to end
+        for (Operator leaf : leaveOps) {
+            jobClose(leaf);
+        }
+
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    /**
+     * mv to final directory on hdfs (not real final)
+     * 
+     * @param leaf
+     * @throws Exception
+     */
+    private void jobClose(Operator leaf) throws Exception {
+        FileSinkOperator fsOp = (FileSinkOperator) leaf;
+        FileSinkDesc desc = fsOp.getConf();
+        boolean isNativeTable = !desc.getTableInfo().isNonNative();
+        if ((conf != null) && isNativeTable) {
+            String specPath = desc.getDirName();
+            DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
+            // for 0.7.0
+            fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
+            // for 0.8.0
+            // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
+            // desc);
+        }
+    }
+}
diff --git a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
index 4c40f5d..4ef74e9 100644
--- a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
@@ -444,10 +444,11 @@
 
             // hyracks run
             if (sem instanceof SemanticAnalyzer && command.toLowerCase().indexOf("create") < 0) {
-                hivesterix = true;
-                return engine.compileJob(sem.getRootTasks());
+                int engineRet = engine.compileJob(sem.getRootTasks());
+                if (engineRet == 0) {
+                    hivesterix = true;
+                }
             }
-
             return 0;
         } catch (SemanticException e) {
             errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
diff --git a/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml
index 1fce28e..23a842a 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml
+++ b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml
@@ -36,7 +36,8 @@
 			whereas hive uses -1 as its default value.
 			By setting this property to -1, Hive will automatically figure out what
 			should be the number of reducers.
-  </description>
+        	</description>
+        </property>
 
         <property>
 		<name>hive.hyracks.connectorpolicy</name>
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
index d11b091..0e737c5 100755
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
@@ -47,10 +47,15 @@
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
         fi 
 else
-        IPADDR=`/sbin/ifconfig en1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
-	if [ "$IPADDR" = "" ]
+        #Get IP Address
+        IPADDR=`/sbin/ifconfig en0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        if [ "$IPADDR" = "" ]
         then
-                IPADDR=`/sbin/ifconfig lo0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+                IPADDR=`/sbin/ifconfig en1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi
+        if [ "$IPADDR" = "" ]
+        then
+                IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
         fi
 
 fi
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 7c91c02..4175078 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -420,7 +420,7 @@
     private M allocateMessage() {
         M message;
         if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
+            message = msgPool.get(usedMessage);
             usedMessage++;
         } else {
             message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
new file mode 100644
index 0000000..f51ad88
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexPartitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Users can extend this class to implement the desired vertex partitioning behavior.
+ * 
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexPartitioner<I extends WritableComparable> {
+
+    /**
+     * @param vertexId
+     *            The input vertex id.
+     * @param nPartitions
+     *            The total number of partitions.
+     * @return The partition id.
+     */
+    public abstract int getPartitionId(I vertexId, int nPartitions);
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 81472aa..4cddaf0 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 
@@ -59,6 +60,8 @@
     public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
     /** The normalized key computer class */
     public static final String NMK_COMPUTER_CLASS = "pregelix.nmkComputerClass";
+    /** The partitioner class */
+    public static final String PARTITIONER_CLASS = "pregelix.partitionerClass";
     /** num of vertices */
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
@@ -69,6 +72,8 @@
     public static final String JOB_ID = "pregelix.jobid";
     /** frame size */
     public static final String FRAME_SIZE = "pregelix.framesize";
+    /** update intensive */
+    public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
 
     /**
      * Constructor that will instantiate the configuration
@@ -178,4 +183,22 @@
     final public void setNoramlizedKeyComputerClass(Class<?> nkcClass) {
         getConfiguration().setClass(NMK_COMPUTER_CLASS, nkcClass, NormalizedKeyComputer.class);
     }
+
+    /**
+     * Set the vertex partitioner class
+     * 
+     * @param partitionerClass
+     */
+    final public void setVertexPartitionerClass(Class<?> partitionerClass) {
+        getConfiguration().setClass(PARTITIONER_CLASS, partitionerClass, VertexPartitioner.class);
+    }
+
+    /**
+     * Indicate if the job needs to do a lot of graph mutations or variable size updates
+     * 
+     * @param updateHeavyFlag
+     */
+    final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+        getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6bac923..03c37dc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -156,7 +157,7 @@
     }
 
     /**
-     * Create a global aggregator class
+     * Create a global aggregator object
      * 
      * @param conf
      *            Configuration to check
@@ -391,9 +392,9 @@
         try {
             return aggregateValueClass.newInstance();
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createPartialAggregateValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createPartialAggregateValue: Illegally accessed", e);
         }
     }
 
@@ -415,9 +416,9 @@
             }
             return instance;
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createPartialCombineValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createPartialCombineValue: Illegally accessed", e);
         }
     }
 
@@ -433,13 +434,46 @@
         try {
             return aggregateValueClass.newInstance();
         } catch (InstantiationException e) {
-            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+            throw new IllegalArgumentException("createAggregateValue: Failed to instantiate", e);
         } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+            throw new IllegalArgumentException("createAggregateValue: Illegally accessed", e);
         }
     }
 
     /**
+     * Create a user aggregate value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    @SuppressWarnings("rawtypes")
+    public static VertexPartitioner createVertexPartitioner(Configuration conf) {
+        Class<? extends VertexPartitioner> vertexPartitionerClass = getVertexPartitionerClass(conf);
+        try {
+            return vertexPartitionerClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex partitioner class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return The user defined vertex partitioner class
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
+    }
+
+    /**
      * Get the job configuration parameter whether the vertex states will increase dynamically
      * 
      * @param conf
@@ -460,4 +494,14 @@
     public static int getFrameSize(Configuration conf) {
         return conf.getInt(PregelixJob.FRAME_SIZE, -1);
     }
+
+    /**
+     * Should the job use LSM or B-tree to store vertices
+     * 
+     * @param conf
+     * @return
+     */
+    public static boolean useLSM(Configuration conf) {
+        return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
new file mode 100644
index 0000000..263ec65
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultVertexPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+
+/**
+ * The deafult vertex partitioner which use the hashcode of the vertex id to determine the partition
+ * of the vertex.
+ * 
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultVertexPartitioner<I extends WritableComparable> extends VertexPartitioner<I> {
+
+    @Override
+    public int getPartitionId(I vertexId, int nPartitions) {
+        return vertexId.hashCode() % nPartitions;
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index c0bbafd..1600ab5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -45,4 +45,15 @@
             throw new HyracksDataException(e);
         }
     }
+    
+    @Override
+    public Configuration createConfiguration() throws HyracksDataException{
+        try {
+            Configuration conf = new Configuration();
+            SerDeUtils.deserialize(conf, data);
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index dacb432..dc1c73f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -53,15 +53,22 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -80,8 +87,10 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public abstract class JobGen implements IJobGen {
@@ -169,7 +178,7 @@
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
-                new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+                getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
                 NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeCreate);
         spec.setFrameSize(frameSize);
@@ -230,15 +239,14 @@
         typeTraits[1] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false, new BTreeDataflowHelperFactory(),
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, false, getIndexDataflowHelperFactory(),
                 NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * connect operator descriptors
          */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
         spec.setFrameSize(frameSize);
@@ -313,8 +321,7 @@
         /**
          * connect operator descriptors
          */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
                 comparatorFactories), sorter, 0, writer, 0);
@@ -358,7 +365,7 @@
         typeTraits[1] = new TypeTraits(false);
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
+                null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -381,8 +388,7 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
                 comparatorFactories), scanner, 0, writer, 0);
@@ -428,7 +434,7 @@
 
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
+                null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -459,7 +465,7 @@
 
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
         IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
-                lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
+                lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
 
         ClusterConfig.setLocationConstraint(spec, drop);
         spec.addRoot(drop);
@@ -467,6 +473,28 @@
         return spec;
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+        if (partitionerClazz != null) {
+            return new VertexPartitionComputerFactory(confFactory);
+        } else {
+            return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+                    BspUtils.getVertexIndexClass(conf)));
+        }
+    }
+
+    protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
+        if (BspUtils.useLSM(conf)) {
+            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
+                    3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+                    NoOpIOOperationCallback.INSTANCE, 0.01);
+        } else {
+            return new BTreeDataflowHelperFactory();
+        }
+    }
+
     /** generate non-first iteration job */
     protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
 
@@ -476,4 +504,4 @@
     /** generate clean-up job */
     public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index d9a24bc..2bab291 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -35,14 +35,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -59,11 +54,11 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -72,8 +67,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenInnerJoin extends JobGen {
 
@@ -135,12 +128,11 @@
                 vertexClass.getName());
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
-        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 6,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete, rdFinal);
+                getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -168,7 +160,7 @@
                 WritableComparator.get(vertexIdClass).getClass());
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -223,7 +215,7 @@
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
@@ -234,7 +226,7 @@
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -245,8 +237,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
 
         /** connect all operators **/
@@ -339,14 +330,9 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
         IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
-                leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
-                new BTreeDataflowHelperFactory(), true);
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
+                comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
         ClusterConfig.setLocationConstraint(spec, setUnion);
 
         /**
@@ -364,7 +350,7 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
                 JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+                getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
 
@@ -379,7 +365,7 @@
         IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
-                indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+                indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -447,7 +433,7 @@
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
@@ -458,7 +444,7 @@
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -470,8 +456,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 5faf122..3af8921 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -35,14 +35,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -59,9 +54,9 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -72,8 +67,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoin extends JobGen {
 
@@ -129,12 +122,11 @@
                 vertexClass.getName());
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
-        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -206,8 +198,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -216,8 +208,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -227,8 +219,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -318,10 +309,6 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
         INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -335,11 +322,10 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
-                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -408,8 +394,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -418,7 +404,7 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
@@ -431,8 +417,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index fee738e..50949aa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -34,14 +34,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -58,9 +53,9 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -71,8 +66,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoinSingleSort extends JobGen {
 
@@ -131,12 +124,11 @@
                 vertexClass.getName());
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
-        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -198,8 +190,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -208,8 +200,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -221,8 +213,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -307,10 +298,6 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
         INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -324,11 +311,10 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
-                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -385,8 +371,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -395,8 +381,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -408,8 +394,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index eef6b7e..362e413 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -34,14 +34,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -58,9 +53,9 @@
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
@@ -71,8 +66,6 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
 
 public class JobGenOuterJoinSort extends JobGen {
 
@@ -128,12 +121,11 @@
                 vertexClass.getName());
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
-        BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -212,8 +204,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -222,8 +214,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -234,8 +226,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -321,10 +312,6 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
-                typeTraits));
         INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
         nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
@@ -338,11 +325,10 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
-                leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
+                nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
+                null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -418,8 +404,8 @@
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
@@ -428,8 +414,8 @@
         int[] fieldPermutationDelete = new int[] { 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
+                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -441,8 +427,7 @@
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
-                new WritableSerializerDeserializerFactory(vertexIdClass));
+        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 56fd676..73b053f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -67,6 +67,7 @@
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
+        ncConfig1.ioDevices="dev1,dev2";
         ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -79,6 +80,7 @@
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
+        ncConfig2.ioDevices="dev1,dev2";
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 0a478d4..3604e57 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -143,6 +143,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-lsm-btree</artifactId>
+			<version>0.2.7-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
 			<version>0.2.7-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 0b3a7fe..e450380 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -26,7 +26,6 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -91,8 +90,7 @@
 
     public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
-            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
-            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
@@ -125,8 +123,7 @@
 
     public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
-            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
-            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index bd45509..238b775 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -54,7 +51,7 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private BTree btree;
+    private ITreeIndex index;
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
     private boolean lowKeyInclusive;
@@ -62,9 +59,8 @@
     private RangePredicate rangePred;
     private MultiComparator lowKeySearchCmp;
     private MultiComparator highKeySearchCmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    private IIndexCursor cursor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private final IFrameWriter[] writers;
@@ -99,7 +95,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     @Override
@@ -112,13 +108,11 @@
 
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
-            setCursor();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
 
             // TODO: Can we construct the multicmps using helper methods?
-            int lowKeySearchFields = btree.getComparatorFactories().length;
-            int highKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
+            int highKeySearchFields = index.getComparatorFactories().length;
             if (lowKey != null)
                 lowKeySearchFields = lowKey.getFieldCount();
             if (highKey != null)
@@ -126,7 +120,7 @@
 
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
@@ -135,10 +129,9 @@
             } else {
                 IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
                 for (int i = 0; i < highKeySearchFields; i++) {
-                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                    highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
                 }
                 highKeySearchCmp = new MultiComparator(highKeySearchComparators);
-
             }
 
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
@@ -147,9 +140,10 @@
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-            updateBuffer.setFieldCount(btree.getFieldCount());
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            setCursor();
+            cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+            updateBuffer.setFieldCount(index.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.close();
             throw new HyracksDataException(e);
@@ -201,7 +195,7 @@
             try {
                 cursor.close();
                 //batch update
-                updateBuffer.updateBTree(indexAccessor);
+                updateBuffer.updateIndex(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index 6dc713c..440ae86 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -87,8 +87,7 @@
 
     public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
-            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
-            ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             boolean isSetUnion) {
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index d3114d2..221a818 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -29,10 +29,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -48,7 +48,7 @@
     private ArrayTupleBuilder tb;
     private DataOutput dos;
 
-    private BTree btree;
+    private ITreeIndex index;
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
     private boolean lowKeyInclusive;
@@ -57,7 +57,7 @@
     private MultiComparator lowKeySearchCmp;
     private MultiComparator highKeySearchCmp;
     private IIndexCursor cursor;
-    protected ITreeIndexAccessor indexAccessor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private final RecordDescriptor inputRecDesc;
@@ -91,11 +91,11 @@
 
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
             writer.open();
 
-            int lowKeySearchFields = btree.getComparatorFactories().length;
-            int highKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
+            int highKeySearchFields = index.getComparatorFactories().length;
             if (lowKey != null)
                 lowKeySearchFields = lowKey.getFieldCount();
             if (highKey != null)
@@ -103,7 +103,7 @@
 
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
@@ -112,7 +112,7 @@
             } else {
                 IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
                 for (int i = 0; i < highKeySearchFields; i++) {
-                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                    highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
                 }
                 highKeySearchCmp = new MultiComparator(highKeySearchComparators);
             }
@@ -120,11 +120,11 @@
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
             writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             setCursor();
         } catch (Exception e) {
             treeIndexOpHelper.close();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 289128e..06119ea 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -30,13 +30,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -60,13 +57,12 @@
     private ArrayTupleBuilder nullTupleBuilder;
     private DataOutput dos;
 
-    private BTree btree;
+    private ITreeIndex index;
     private RangePredicate rangePred;
     private MultiComparator lowKeySearchCmp;
     private MultiComparator highKeySearchCmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    private IIndexCursor cursor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private final RecordDescriptor inputRecDesc;
@@ -111,7 +107,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     @Override
@@ -124,18 +120,16 @@
 
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
-            setCursor();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
 
             // construct range predicate
             // TODO: Can we construct the multicmps using helper methods?
-            int lowKeySearchFields = btree.getComparatorFactories().length;
-            int highKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
+            int highKeySearchFields = index.getComparatorFactories().length;
 
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
@@ -144,7 +138,7 @@
             } else {
                 IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
                 for (int i = 0; i < highKeySearchFields; i++) {
-                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                    highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
                 }
                 highKeySearchCmp = new MultiComparator(highKeySearchComparators);
             }
@@ -164,7 +158,8 @@
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            setCursor();
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -179,8 +174,8 @@
                 match = false;
             }
 
-            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-            updateBuffer.setFieldCount(btree.getFieldCount());
+            cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+            updateBuffer.setFieldCount(index.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.close();
             throw new HyracksDataException(e);
@@ -239,7 +234,7 @@
             try {
                 cursor.close();
                 //batch update
-                updateBuffer.updateBTree(indexAccessor);
+                updateBuffer.updateIndex(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 6f7643e..48812b7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -30,13 +30,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -53,14 +50,13 @@
     private ArrayTupleBuilder tb;
     private DataOutput dos;
 
-    private BTree btree;
+    private ITreeIndex index;
     private boolean isForward;
     private RangePredicate rangePred;
     private MultiComparator lowKeySearchCmp;
     private MultiComparator highKeySearchCmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    private IIndexCursor cursor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private final RecordDescriptor inputRecDesc;
@@ -93,7 +89,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     @Override
@@ -101,19 +97,18 @@
         accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
             setCursor();
             writer.open();
 
             // construct range predicate
             // TODO: Can we construct the multicmps using helper methods?
-            int lowKeySearchFields = btree.getComparatorFactories().length;
-            int highKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
+            int highKeySearchFields = index.getComparatorFactories().length;
 
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
@@ -122,7 +117,7 @@
             } else {
                 IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
                 for (int i = 0; i < highKeySearchFields; i++) {
-                    highKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                    highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
                 }
                 highKeySearchCmp = new MultiComparator(highKeySearchComparators);
 
@@ -131,12 +126,12 @@
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
             writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
+            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + index.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index c978614..ee821df 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -28,13 +28,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -55,13 +52,12 @@
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
 
-    private BTree btree;
+    private ITreeIndex index;
     private boolean isForward;
     private RangePredicate rangePred;
     private MultiComparator lowKeySearchCmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    private IIndexCursor cursor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private PermutingFrameTupleReference lowKey;
@@ -102,7 +98,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     @Override
@@ -112,15 +108,13 @@
 
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
-            setCursor();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
 
             rangePred = new RangePredicate(null, null, true, true, null, null);
-            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
@@ -128,7 +122,8 @@
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            setCursor();
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -142,8 +137,8 @@
                 currentTopTuple = cursor.getTuple();
                 match = false;
             }
-            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-            updateBuffer.setFieldCount(btree.getFieldCount());
+            cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+            updateBuffer.setFieldCount(index.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.close();
             throw new HyracksDataException(e);
@@ -210,7 +205,7 @@
                 cursor.close();
 
                 //batch update
-                updateBuffer.updateBTree(indexAccessor);
+                updateBuffer.updateIndex(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index eef7c85..d6f95f9 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -29,13 +29,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -51,12 +48,11 @@
     private ArrayTupleBuilder tb;
     private DataOutput dos;
 
-    private BTree btree;
+    private ITreeIndex index;
     private RangePredicate rangePred;
     private MultiComparator lowKeySearchCmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    private IIndexCursor cursor;
+    protected IIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
     private final RecordDescriptor inputRecDesc;
@@ -86,7 +82,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     @Override
@@ -95,26 +91,25 @@
 
         try {
             treeIndexOpHelper.open();
-            btree = (BTree) treeIndexOpHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
-            setCursor();
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
             writer.open();
 
             rangePred = new RangePredicate(null, null, true, true, null, null);
-            int lowKeySearchFields = btree.getComparatorFactories().length;
+            int lowKeySearchFields = index.getComparatorFactories().length;
             IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
             for (int i = 0; i < lowKeySearchFields; i++) {
-                lowKeySearchComparators[i] = btree.getComparatorFactories()[i].createBinaryComparator();
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
             writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(btree.getFieldCount());
+            tb = new ArrayTupleBuilder(index.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            setCursor();
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
similarity index 93%
rename from pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
rename to pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
index 784288f..f651d68 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorDescriptor.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 
-public class BTreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+public class TreeSearchFunctionUpdateOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
 
@@ -52,7 +52,7 @@
 
     private final int outputArity;
 
-    public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+    public TreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -84,7 +84,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new BTreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
+        return new TreeSearchFunctionUpdateOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward,
                 lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, functionFactory, preHookFactory,
                 postHookFactory, inputRdFactory, outputArity);
     }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
similarity index 76%
rename from pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
rename to pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index 0216b52..13aab31 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -19,6 +19,7 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -27,13 +28,10 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
@@ -48,7 +46,7 @@
 import edu.uci.ics.pregelix.dataflow.util.SearchKeyTupleReference;
 import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
 
-public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
+public class TreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     protected TreeIndexDataflowHelper treeIndexHelper;
     protected FrameTupleAccessor accessor;
 
@@ -57,7 +55,7 @@
     protected ArrayTupleBuilder tb;
     protected DataOutput dos;
 
-    protected BTree btree;
+    protected ITreeIndex index;
     protected boolean isForward;
     protected PermutingFrameTupleReference lowKey;
     protected PermutingFrameTupleReference highKey;
@@ -66,9 +64,11 @@
     protected RangePredicate rangePred;
     protected MultiComparator lowKeySearchCmp;
     protected MultiComparator highKeySearchCmp;
-    protected ITreeIndexCursor cursor;
+    protected IIndexCursor cursor;
     protected ITreeIndexFrame cursorFrame;
-    protected ITreeIndexAccessor indexAccessor;
+    protected IIndexAccessor indexAccessor;
+    protected int[] lowKeyFields;
+    protected int[] highKeyFields;
 
     protected RecordDescriptor recDesc;
 
@@ -78,7 +78,7 @@
     private final UpdateBuffer updateBuffer;
     private final SearchKeyTupleReference tempTupleReference = new SearchKeyTupleReference();
 
-    public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
+    public TreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
@@ -88,6 +88,8 @@
         this.isForward = isForward;
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
+        this.lowKeyFields = lowKeyFields;
+        this.highKeyFields = highKeyFields;
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         if (lowKeyFields != null && lowKeyFields.length > 0) {
             lowKey = new PermutingFrameTupleReference();
@@ -114,25 +116,46 @@
 
         try {
             treeIndexHelper.open();
-            btree = (BTree) treeIndexHelper.getIndexInstance();
-            cursorFrame = btree.getLeafFrameFactory().createFrame();
-            setCursor();
+            index = (ITreeIndex) treeIndexHelper.getIndexInstance();
+            cursorFrame = index.getLeafFrameFactory().createFrame();
 
             // Construct range predicate.
-            lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), lowKey);
-            highKeySearchCmp = BTreeUtils.getSearchMultiComparator(btree.getComparatorFactories(), highKey);
+            int lowKeySearchFields = index.getComparatorFactories().length;
+            int highKeySearchFields = index.getComparatorFactories().length;
+            if (lowKey != null)
+                lowKeySearchFields = lowKey.getFieldCount();
+            if (highKey != null)
+                highKeySearchFields = highKey.getFieldCount();
+
+            IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+            for (int i = 0; i < lowKeySearchFields; i++) {
+                lowKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+            }
+            lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
+
+            if (lowKeySearchFields == highKeySearchFields) {
+                highKeySearchCmp = lowKeySearchCmp;
+            } else {
+                IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+                for (int i = 0; i < highKeySearchFields; i++) {
+                    highKeySearchComparators[i] = index.getComparatorFactories()[i].createBinaryComparator();
+                }
+                highKeySearchCmp = new MultiComparator(highKeySearchComparators);
+            }
+            
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
 
             writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(btree.getFieldCount());
+            tb = new ArrayTupleBuilder(index.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
-            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            setCursor();
 
-            cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-            updateBuffer.setFieldCount(btree.getFieldCount());
+            cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
+            updateBuffer.setFieldCount(index.getFieldCount());
         } catch (Exception e) {
             treeIndexHelper.close();
             throw new HyracksDataException(e);
@@ -140,7 +163,7 @@
     }
 
     protected void setCursor() {
-        cursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, false);
+        cursor = indexAccessor.createSearchCursor();
     }
 
     protected void writeSearchResults() throws Exception {
@@ -184,7 +207,7 @@
             try {
                 cursor.close();
                 //batch update
-                updateBuffer.updateBTree(indexAccessor);
+                updateBuffer.updateIndex(indexAccessor);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index b141eaf..8709301 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -19,15 +19,15 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 public class CopyUpdateUtil {
 
     public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
-            UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, ITreeIndexAccessor indexAccessor,
-            ITreeIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
+            UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
+            IIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
         if (cloneUpdateTb.getSize() > 0) {
             int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
             int srcStart = fieldEndOffsets[0];
@@ -46,7 +46,7 @@
                 //release the cursor/latch
                 cursor.close();
                 //batch update
-                updateBuffer.updateBTree(indexAccessor);
+                updateBuffer.updateIndex(indexAccessor);
                 //try append the to-be-updated tuple again
                 if (!updateBuffer.appendTuple(cloneUpdateTb)) {
                     throw new HyracksDataException("cannot append tuple builder!");
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index 327178c..1ff3959 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 /**
@@ -87,7 +87,7 @@
         }
     }
 
-    public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+    public void updateIndex(IIndexAccessor bta) throws HyracksDataException, IndexException {
         // batch update
         for (int i = 0; i <= currentInUse; i++) {
             ByteBuffer buffer = buffers.get(i);
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 962c9f6..2828451 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
@@ -134,6 +130,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-lsm-common</artifactId>
+			<version>0.2.7-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
 			<version>0.2.7-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index dc9698b..0f41568 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -25,4 +25,6 @@
 
     public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
 
+    public Configuration createConfiguration() throws HyracksDataException;
+
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index b86691c..2121ede 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -31,6 +31,9 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -52,6 +55,7 @@
     private final ILocalResourceRepository localResourceRepository;
     private final ResourceIdFactory resourceIdFactory;
     private final IBufferCache bufferCache;
+    private final IVirtualBufferCache vBufferCache;
     private final IFileMapManager fileMapManager;
     private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
     private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
@@ -76,8 +80,11 @@
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
                 new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
                 threadFactory);
+        int numPagesInMemComponents = numPages / 4;
+        vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(), pageSize,
+                numPagesInMemComponents));
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
-        lcManager = new IndexLifecycleManager();
+        lcManager = new IndexLifecycleManager(numPagesInMemComponents * pageSize * 2);
         localResourceRepository = new TransientLocalResourceRepository();
         resourceIdFactory = new ResourceIdFactory(0);
     }
@@ -110,6 +117,10 @@
         return bufferCache;
     }
 
+    public IVirtualBufferCache getVirtualBufferCache() {
+        return vBufferCache;
+    }
+
     public IFileMapProvider getFileMapManager() {
         return fileMapManager;
     }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 69c5612..15117a1 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
@@ -80,6 +81,7 @@
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -132,6 +134,7 @@
         job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
         job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -230,11 +233,12 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setMutationOrVariableSizedUpdateHeavy(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
-    
+
     private static void generateMaximalCliqueJob2(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(MaximalCliqueVertex.class);
@@ -243,11 +247,12 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setMutationOrVariableSizedUpdateHeavy(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
-    
+
     private static void generateMaximalCliqueJob3(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(MaximalCliqueVertex.class);
@@ -256,6 +261,8 @@
         job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
         job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+        job.setMutationOrVariableSizedUpdateHeavy(true);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 199efab..decbde8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 46444b3..cca66bb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -137,6 +122,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 47db410..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index c08853b..b4c42e6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -95,6 +80,7 @@
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index aafabee..6cf075b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -95,6 +80,7 @@
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index ee13335..49e2e6f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -95,6 +80,7 @@
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
@@ -135,6 +121,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 5f03027..65e0b30 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 9ab92ff..9e1e0b0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 314ca55..c4366d7 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -137,6 +122,7 @@
 <property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
 <property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
 <property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index f8a2055..c05a4da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index f1c72fa..ac0d508 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 427c0ea..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 7e96163..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 823e8b0..9acd7bc 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 4ae102b..6c25575 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 228a526..4a40a6a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -1,19 +1,4 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<configuration>
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/only.txt b/pregelix/pregelix-example/src/test/resources/only.txt
index e69de29..560c7d7 100644
--- a/pregelix/pregelix-example/src/test/resources/only.txt
+++ b/pregelix/pregelix-example/src/test/resources/only.txt
@@ -0,0 +1 @@
+Page
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index ae9f47e..54e2256 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -148,6 +148,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-lsm-common</artifactId>
+			<version>0.2.7-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-control-cc</artifactId>
 			<version>0.2.7-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java
new file mode 100644
index 0000000..ec51047
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/VirtualBufferCacheProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+/**
+ * The virtual buffer cache provider
+ * 
+ * @author yingyib
+ */
+public class VirtualBufferCacheProvider implements IVirtualBufferCacheProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    public VirtualBufferCacheProvider(){
+        
+    }
+    
+    @Override
+    public synchronized IVirtualBufferCache getVirtualBufferCache(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getVirtualBufferCache();
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
new file mode 100644
index 0000000..88577c2
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/NoOpUpdateFunctionFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+/**
+ * No operation update function factory
+ * 
+ * @author yingyib
+ */
+public class NoOpUpdateFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    public static NoOpUpdateFunctionFactory INSTANCE = new NoOpUpdateFunctionFactory();
+
+    private NoOpUpdateFunctionFactory() {
+
+    }
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writer)
+                    throws HyracksDataException {
+
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
new file mode 100644
index 0000000..acccabb
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexPartitionComputerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+/**
+ * The vertex-based partition computer factory.
+ * It is used to support customized graph partitioning function.
+ * 
+ * @author yingyib
+ */
+public class VertexPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+
+    public VertexPartitionComputerFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public ITuplePartitionComputer createPartitioner() {
+        try {
+            final Configuration conf = confFactory.createConfiguration();
+            return new ITuplePartitionComputer() {
+                private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+                private final DataInputStream dis = new DataInputStream(bbis);
+                private final VertexPartitioner partitioner = BspUtils.createVertexPartitioner(conf);
+                private final WritableComparable vertexId = BspUtils.createVertexIndex(conf);
+
+                @SuppressWarnings("unchecked")
+                public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                    try {
+                        int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                                + accessor.getFieldStartOffset(tIndex, 0);
+                        bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                        vertexId.readFields(dis);
+                        return Math.abs(partitioner.getPartitionId(vertexId, nParts) % nParts);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            };
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}