1. let hyracks-hdfs support hadoop new API\n 2. let Pregelix use hyracks-hdfs functionality;\n 3. increase degree-of-parallelism in tests

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2816 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 69936a4..bc44db8 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -108,7 +108,6 @@
                             synchronized (executed) {
                                 if (executed[i] == false) {
                                     executed[i] = true;
-                                    System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
                                 } else {
                                     continue;
                                 }
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
new file mode 100644
index 0000000..d843d27
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private byte[] confBytes;
+
+    public ConfFactory(Job conf) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            conf.getConfiguration().write(dos);
+            confBytes = bos.toByteArray();
+            dos.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public Job getConf() throws HyracksDataException {
+        try {
+            Job conf = new Job();
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+            conf.getConfiguration().readFields(dis);
+            dis.close();
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..54d60c0
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+@SuppressWarnings("rawtypes")
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final ConfFactory confFactory;
+    private final InputSplit[] inputSplits;
+    private final String[] scheduledLocations;
+    private final IKeyValueParserFactory tupleParserFactory;
+    private final boolean[] executed;
+
+    /**
+     * The constructor of HDFSReadOperatorDescriptor.
+     * 
+     * @param spec
+     *            the JobSpecification object
+     * @param rd
+     *            the output record descriptor
+     * @param conf
+     *            the Hadoop JobConf object, which contains the input format and the input paths
+     * @param splits
+     *            the array of FileSplits (HDFS chunks).
+     * @param scheduledLocations
+     *            the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
+     *            is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+     * @param tupleParserFactory
+     *            the ITupleParserFactory implementation instance.
+     * @throws HyracksException
+     */
+    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, InputSplit[] splits,
+            String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+        super(spec, 0, 1);
+        try {
+            this.inputSplits = splits;
+            this.confFactory = new ConfFactory(conf);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
+        this.tupleParserFactory = tupleParserFactory;
+        this.recordDescriptors[0] = rd;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final Job conf = confFactory.getConf();
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+                    writer.open();
+                    InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
+                            conf.getConfiguration());
+                    for (int i = 0; i < inputSplits.length; i++) {
+                        /**
+                         * read all the partitions scheduled to the current node
+                         */
+                        if (scheduledLocations[i].equals(nodeName)) {
+                            /**
+                             * pick an unread split to read
+                             * synchronize among simultaneous partitions in the same machine
+                             */
+                            synchronized (executed) {
+                                if (executed[i] == false) {
+                                    executed[i] = true;
+                                    System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
+                                } else {
+                                    continue;
+                                }
+                            }
+
+                            /**
+                             * read the split
+                             */
+                            TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
+                                    new TaskAttemptID());
+                            RecordReader reader = inputFormat.createRecordReader(inputSplits[i], context);
+                            reader.initialize(inputSplits[i], context);
+                            while (reader.nextKeyValue() == true) {
+                                parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+                            }
+                        }
+                    }
+                    parser.flush(writer);
+                    writer.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..441815b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private ITupleWriterFactory tupleWriterFactory;
+    private String outputPath;
+
+    /**
+     * The constructor of HDFSWriteOperatorDescriptor.
+     * 
+     * @param spec
+     *            the JobSpecification object
+     * @param conf
+     *            the Hadoop JobConf which contains the output path
+     * @param tupleWriterFactory
+     *            the ITupleWriterFactory implementation object
+     * @throws HyracksException
+     */
+    public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, String outputPath,
+            ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+        super(spec, 1, 0);
+        this.confFactory = new ConfFactory(conf);
+        this.tupleWriterFactory = tupleWriterFactory;
+        this.outputPath = outputPath;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final Job conf = confFactory.getConf();
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+            private String fileName = outputPath + File.separator + "part-" + partition;
+            private FSDataOutputStream dos;
+            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+            private FrameTupleReference tuple = new FrameTupleReference();
+            private ITupleWriter tupleWriter;
+
+            @Override
+            public void open() throws HyracksDataException {
+                tupleWriter = tupleWriterFactory.getTupleWriter();
+                try {
+                    FileSystem dfs = FileSystem.get(conf.getConfiguration());
+                    dos = dfs.create(new Path(fileName), true);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                accessor.reset(buffer);
+                int tupleCount = accessor.getTupleCount();
+                for (int i = 0; i < tupleCount; i++) {
+                    tuple.reset(accessor, i);
+                    tupleWriter.write(dos, tuple);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    dos.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
new file mode 100644
index 0000000..5b70a4e
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+/**
+ * The scheduler conduct data-local scheduling for data on HDFS
+ */
+public class Scheduler {
+
+    /** a list of NCs */
+    private String[] NCs;
+
+    /** a map from ip to NCs */
+    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+    /** a map from the NC name to the index */
+    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+    /**
+     * The constructor of the scheduler
+     * 
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    public Scheduler(String ipAddress, int port) throws HyracksException {
+        try {
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            loadIPAddressToNCMap(ncNameToNcInfos);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+        loadIPAddressToNCMap(ncNameToNcInfos);
+    }
+
+    /**
+     * Set location constraints for a file scan operator with a list of file splits
+     * 
+     * @throws HyracksDataException
+     */
+    public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
+        int[] capacity = new int[NCs.length];
+        Arrays.fill(capacity, 0);
+        String[] locations = new String[splits.size()];
+        int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
+                / capacity.length + 1);
+
+        try {
+            Random random = new Random(System.currentTimeMillis());
+            boolean scheduled[] = new boolean[splits.size()];
+            Arrays.fill(scheduled, false);
+
+            for (int i = 0; i < splits.size(); i++) {
+                /**
+                 * get the location of all the splits
+                 */
+                String[] loc = splits.get(i).getLocations();
+                if (loc.length > 0) {
+                    for (int j = 0; j < loc.length; j++) {
+                        /**
+                         * get all the IP addresses from the name
+                         */
+                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+                        /**
+                         * iterate overa all ips
+                         */
+                        for (InetAddress ip : allIps) {
+                            /**
+                             * if the node controller exists
+                             */
+                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                                /**
+                                 * set the ncs
+                                 */
+                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                                int arrayPos = random.nextInt(dataLocations.size());
+                                String nc = dataLocations.get(arrayPos);
+                                int pos = ncNameToIndex.get(nc);
+                                /**
+                                 * check if the node is already full
+                                 */
+                                if (capacity[pos] < slots) {
+                                    locations[i] = nc;
+                                    capacity[pos]++;
+                                    scheduled[i] = true;
+                                }
+                            }
+                        }
+
+                        /**
+                         * break the loop for data-locations if the schedule has already been found
+                         */
+                        if (scheduled[i] == true) {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            /**
+             * find the lowest index the current available NCs
+             */
+            int currentAvailableNC = 0;
+            for (int i = 0; i < capacity.length; i++) {
+                if (capacity[i] < slots) {
+                    currentAvailableNC = i;
+                    break;
+                }
+            }
+
+            /**
+             * schedule no-local file reads
+             */
+            for (int i = 0; i < splits.size(); i++) {
+                // if there is no data-local NC choice, choose a random one
+                if (!scheduled[i]) {
+                    locations[i] = NCs[currentAvailableNC];
+                    capacity[currentAvailableNC]++;
+                    scheduled[i] = true;
+
+                    /**
+                     * move the available NC cursor to the next one
+                     */
+                    for (int j = currentAvailableNC; j < capacity.length; j++) {
+                        if (capacity[j] < slots) {
+                            currentAvailableNC = j;
+                            break;
+                        }
+                    }
+                }
+            }
+            return locations;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    /**
+     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+     * 
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+        try {
+            NCs = new String[ncNameToNcInfos.size()];
+            int i = 0;
+
+            /**
+             * build the IP address to NC map
+             */
+            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+                        .getHostAddress();
+                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+                if (matchedNCs == null) {
+                    matchedNCs = new ArrayList<String>();
+                    ipToNcMapping.put(ipAddr, matchedNCs);
+                }
+                matchedNCs.add(entry.getKey());
+                NCs[i] = entry.getKey();
+                i++;
+            }
+
+            /**
+             * set up the NC name to index mapping
+             */
+            for (i = 0; i < NCs.length; i++) {
+                ncNameToIndex.put(NCs[i], i);
+            }
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
index a8680ec..39b6505 100644
--- a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,7 +18,7 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
+      <name>mapred.max.split.size</name>
       <value>2048</value>
    </property>
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index efdbd41..0b1be61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,9 +74,9 @@
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.core.util.DatatypeHelper;
-import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -86,462 +86,390 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 
 public abstract class JobGen implements IJobGen {
-	private static final Logger LOGGER = Logger.getLogger(JobGen.class
-			.getName());
-	protected static final int MB = 1048576;
-	protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-	protected static final int tableSize = 10485767;
-	protected static final String PRIMARY_INDEX = "primary";
-	protected final Configuration conf;
-	protected final PregelixJob giraphJob;
-	protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
-	protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-	protected String jobId = new UUID(System.currentTimeMillis(),
-			System.nanoTime()).toString();
-	protected int frameSize = ClusterConfig.getFrameSize();
-	protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+    protected static final int MB = 1048576;
+    protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    protected static final int tableSize = 10485767;
+    protected static final String PRIMARY_INDEX = "primary";
+    protected final Configuration conf;
+    protected final PregelixJob giraphJob;
+    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+    protected int frameSize = ClusterConfig.getFrameSize();
+    protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
-	protected static final String SECONDARY_INDEX_ODD = "secondary1";
-	protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+    protected static final String SECONDARY_INDEX_ODD = "secondary1";
+    protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-	public JobGen(PregelixJob job) {
-		this.conf = job.getConfiguration();
-		this.giraphJob = job;
-		this.initJobConfiguration();
-		job.setJobId(jobId);
-		
-		//set the frame size to be the one user specified if the user did specify.
-		int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
-		if (specifiedFrameSize > 0) {
-			frameSize = specifiedFrameSize;
-			maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
-		}
-		if (maxFrameNumber <= 0) {
-			maxFrameNumber = 1;
-		}
-	}
+    public JobGen(PregelixJob job) {
+        this.conf = job.getConfiguration();
+        this.giraphJob = job;
+        this.initJobConfiguration();
+        job.setJobId(jobId);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	private void initJobConfiguration() {
-		Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
-				Vertex.class);
-		List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
-				Vertex.class, vertexClass);
-		Type vertexIndexType = parameterTypes.get(0);
-		Type vertexValueType = parameterTypes.get(1);
-		Type edgeValueType = parameterTypes.get(2);
-		Type messageValueType = parameterTypes.get(3);
-		conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
-				(Class<?>) vertexIndexType, WritableComparable.class);
-		conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
-				(Class<?>) vertexValueType, Writable.class);
-		conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
-				Writable.class);
-		conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
-				(Class<?>) messageValueType, Writable.class);
+        // set the frame size to be the one user specified if the user did
+        // specify.
+        int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+        if (specifiedFrameSize > 0) {
+            frameSize = specifiedFrameSize;
+            maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+        }
+        if (maxFrameNumber <= 0) {
+            maxFrameNumber = 1;
+        }
+    }
 
-		Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
-		if (!aggregatorClass.equals(GlobalAggregator.class)) {
-			List<Type> argTypes = ReflectionUtils.getTypeArguments(
-					GlobalAggregator.class, aggregatorClass);
-			Type partialAggregateValueType = argTypes.get(4);
-			conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
-					(Class<?>) partialAggregateValueType, Writable.class);
-			Type finalAggregateValueType = argTypes.get(5);
-			conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
-					(Class<?>) finalAggregateValueType, Writable.class);
-		}
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void initJobConfiguration() {
+        Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+        List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+        Type vertexIndexType = parameterTypes.get(0);
+        Type vertexValueType = parameterTypes.get(1);
+        Type edgeValueType = parameterTypes.get(2);
+        Type messageValueType = parameterTypes.get(3);
+        conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+        conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+        conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+        conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
 
-		Class combinerClass = BspUtils.getMessageCombinerClass(conf);
-		if (!combinerClass.equals(MessageCombiner.class)) {
-			List<Type> argTypes = ReflectionUtils.getTypeArguments(
-					MessageCombiner.class, combinerClass);
-			Type partialCombineValueType = argTypes.get(2);
-			conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
-					(Class<?>) partialCombineValueType, Writable.class);
-		}
-	}
+        Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+        if (!aggregatorClass.equals(GlobalAggregator.class)) {
+            List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+            Type partialAggregateValueType = argTypes.get(4);
+            conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+                    Writable.class);
+            Type finalAggregateValueType = argTypes.get(5);
+            conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
+        }
 
-	public String getJobId() {
-		return jobId;
-	}
+        Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+        if (!combinerClass.equals(MessageCombiner.class)) {
+            List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+            Type partialCombineValueType = argTypes.get(2);
+            conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+        }
+    }
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public JobSpecification generateCreatingJob() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		JobSpecification spec = new JobSpecification();
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
+    public String getJobId() {
+        return jobId;
+    }
 
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
-		TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider, typeTraits, comparatorFactories,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, btreeCreate);
-		return spec;
-	}
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public JobSpecification generateLoadingJob() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        return spec;
+    }
 
-		/**
-		 * the graph file scan operator and use count constraint first, will use
-		 * absolute constraint later
-		 */
-		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-		List<InputSplit> splits = new ArrayList<InputSplit>();
-		try {
-			splits = inputFormat.getSplits(giraphJob,
-					fileSplitProvider.getFileSplits().length);
-			LOGGER.info("number of splits: " + splits.size());
-			for (InputSplit split : splits)
-				LOGGER.info(split.toString());
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
-				spec, recordDescriptor, splits, confFactory);
-		ClusterConfig.setLocationConstraint(spec, scanner, splits);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateLoadingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct sort operator
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-				.getAscINormalizedKeyComputerFactory(vertexIdClass);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec, maxFrameNumber, sortFields, nkmFactory,
-				comparatorFactories, recordDescriptor);
-		ClusterConfig.setLocationConstraint(spec, sorter);
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * construct tree bulk load operator
-		 */
-		int[] fieldPermutation = new int[2];
-		fieldPermutation[0] = 0;
-		fieldPermutation[1] = 1;
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider, typeTraits, comparatorFactories,
-				fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
 
-		/**
-		 * connect operator descriptors
-		 */
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new MToNPartitioningConnectorDescriptor(spec,
-				hashPartitionComputerFactory), scanner, 0, sorter, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				btreeBulkLoad, 0);
-		return spec;
-	}
+        /**
+         * construct tree bulk load operator
+         */
+        int[] fieldPermutation = new int[2];
+        fieldPermutation[0] = 0;
+        fieldPermutation[1] = 1;
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
-	@Override
-	public JobSpecification generateJob(int iteration) throws HyracksException {
-		if (iteration <= 0)
-			throw new IllegalStateException(
-					"iteration number cannot be less than 1");
-		if (iteration == 1)
-			return generateFirstIteration(iteration);
-		else
-			return generateNonFirstIteration(iteration);
-	}
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+        return spec;
+    }
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanSortPrintGraph(String nodeName, String path)
-			throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
-		JobSpecification spec = new JobSpecification();
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+    @Override
+    public JobSpecification generateJob(int iteration) throws HyracksException {
+        if (iteration <= 0)
+            throw new IllegalStateException("iteration number cannot be less than 1");
+        if (iteration == 1)
+            return generateFirstIteration(iteration);
+        else
+            return generateNonFirstIteration(iteration);
+    }
 
-		/**
-		 * the graph file scan operator and use count constraint first, will use
-		 * absolute constraint later
-		 */
-		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-		List<InputSplit> splits = new ArrayList<InputSplit>();
-		try {
-			splits = inputFormat.getSplits(giraphJob,
-					fileSplitProvider.getFileSplits().length);
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
-				spec, recordDescriptor, splits, confFactory);
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
-				splits.size());
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct sort operator
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-				.getAscINormalizedKeyComputerFactory(vertexIdClass);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec, maxFrameLimit, sortFields, nkmFactory,
-				comparatorFactories, recordDescriptor);
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
-				splits.size());
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * construct write file operator
-		 */
-		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
-				new File(path)));
-		FileSplit[] results = new FileSplit[1];
-		results[0] = resultFile;
-		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
-				results);
-		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
-				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
-				null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
-				new String[] { "nc1" });
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
 
-		/**
-		 * connect operator descriptors
-		 */
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
-				0);
-		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
-				hashPartitionComputerFactory, sortFields, comparatorFactories),
-				sorter, 0, writer, 0);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanIndexPrintGraph(String nodeName, String path)
-			throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), sorter, 0, writer, 0);
+        return spec;
+    }
 
-		/**
-		 * construct empty tuple operator
-		 */
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
 
-		/**
-		 * construct btree search operator
-		 */
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
-				spec, recordDescriptor, storageManagerInterface,
-				treeRegistryProvider, fileSplitProvider, typeTraits,
-				comparatorFactories, null, null, true, true,
-				new BTreeDataflowHelperFactory(), false,
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, scanner);
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-		/**
-		 * construct write file operator
-		 */
-		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
-				new File(path)));
-		FileSplit[] results = new FileSplit[1];
-		results[0] = resultFile;
-		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
-				results);
-		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
-				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
-				null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
-				new String[] { "nc1" });
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * connect operator descriptors
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
-				0, scanner, 0);
-		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
-				hashPartitionComputerFactory, sortFields, comparatorFactories),
-				scanner, 0, writer, 0);
-		spec.setFrameSize(frameSize);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanIndexWriteGraph() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
 
-		/**
-		 * construct empty tuple operator
-		 */
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
 
-		/**
-		 * construct btree search operator
-		 */
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
-				spec, recordDescriptor, storageManagerInterface,
-				treeRegistryProvider, fileSplitProvider, typeTraits,
-				comparatorFactories, null, null, true, true,
-				new BTreeDataflowHelperFactory(), false,
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, scanner);
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct write file operator
-		 */
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
-				spec, confFactory, inputRdFactory);
-		ClusterConfig.setLocationConstraint(spec, writer);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * connect operator descriptors
-		 */
-		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
-				0, scanner, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
-				0);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+        ClusterConfig.setLocationConstraint(spec, writer);
 
-	/***
-	 * drop the sindex
-	 * 
-	 * @return JobSpecification
-	 * @throws HyracksException
-	 */
-	protected JobSpecification dropIndex(String indexName)
-			throws HyracksException {
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        return spec;
+    }
 
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, indexName);
-		TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider);
+    /***
+     * drop the sindex
+     * 
+     * @return JobSpecification
+     * @throws HyracksException
+     */
+    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+        JobSpecification spec = new JobSpecification();
 
-		ClusterConfig.setLocationConstraint(spec, drop);
-		spec.addRoot(drop);
-		return spec;
-	}
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistryProvider, fileSplitProvider);
 
-	/** generate non-first iteration job */
-	protected abstract JobSpecification generateNonFirstIteration(int iteration)
-			throws HyracksException;
+        ClusterConfig.setLocationConstraint(spec, drop);
+        spec.addRoot(drop);
+        return spec;
+    }
 
-	/** generate first iteration job */
-	protected abstract JobSpecification generateFirstIteration(int iteration)
-			throws HyracksException;
+    /** generate non-first iteration job */
+    protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
 
-	/** generate clean-up job */
-	public abstract JobSpecification[] generateCleanup()
-			throws HyracksException;
+    /** generate first iteration job */
+    protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+    /** generate clean-up job */
+    public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
 
 public class ClusterConfig {
 
@@ -49,6 +50,7 @@
     private static Properties clusterProperties = new Properties();
     private static Map<String, List<String>> ipToNcMapping;
     private static String[] stores;
+    private static Scheduler hdfsScheduler;
 
     /**
      * let tests set config path to be whatever
@@ -211,6 +213,8 @@
                 NCs[i] = entry.getKey();
                 i++;
             }
+
+            hdfsScheduler = new Scheduler(ipAddress, port);
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -218,4 +222,8 @@
         loadClusterProperties();
         loadStores();
     }
+
+    public static Scheduler getHdfsScheduler() {
+        return hdfsScheduler;
+    }
 }
diff --git a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
+      <name>mapred.max.split.size</name>
+      <value>4096</value>
    </property>
 
 </configuration>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 36b0eb8..6c039bf 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -144,5 +144,12 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0736e1d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,6 +17,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -55,15 +56,20 @@
     private final List<InputSplit> splits;
     private final IConfigurationFactory confFactory;
     private final int fieldSize = 2;
+    private final String[] scheduledLocations;
+    private final boolean[] executed;
 
     /**
      * @param spec
      */
     public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
-            IConfigurationFactory confFactory) throws HyracksException {
+            String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
         super(spec, 0, 1);
         this.splits = splits;
         this.confFactory = confFactory;
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
         this.recordDescriptors[0] = rd;
     }
 
@@ -78,7 +84,21 @@
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     writer.open();
-                    loadVertices(ctx, partition);
+                    for (int i = 0; i < scheduledLocations.length; i++) {
+                        if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+                            /**
+                             * pick one from the FileSplit queue
+                             */
+                            synchronized (executed) {
+                                if (!executed[i]) {
+                                    executed[i] = true;
+                                } else {
+                                    continue;
+                                }
+                            }
+                            loadVertices(ctx, i);
+                        }
+                    }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
@@ -104,13 +124,13 @@
 
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
                 TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
-                InputSplit split = splits.get(partition);
+                InputSplit split = splits.get(partitionId);
 
                 if (split instanceof FileSplit) {
                     FileSplit fileSplit = (FileSplit) split;
                     LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
                             + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
-                            + partition);
+                            + partitionId);
                 }
                 VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
                 vertexReader.initialize(split, context);
@@ -122,7 +142,7 @@
                  * set context
                  */
                 Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(partition));
+                        splits.get(partitionId));
                 Vertex.setContext(mapperContext);
 
                 /**
@@ -166,5 +186,4 @@
             }
         };
     }
-
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..71450f1 100644
--- a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
+      <name>mapred.max.split.size</name>
+      <value>128</value>
    </property>
 
 </configuration>