1. let hyracks-hdfs support hadoop new API\n 2. let Pregelix use hyracks-hdfs functionality;\n 3. increase degree-of-parallelism in tests

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2816 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index efdbd41..0b1be61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,9 +74,9 @@
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.core.util.DatatypeHelper;
-import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -86,462 +86,390 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 
 public abstract class JobGen implements IJobGen {
-	private static final Logger LOGGER = Logger.getLogger(JobGen.class
-			.getName());
-	protected static final int MB = 1048576;
-	protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-	protected static final int tableSize = 10485767;
-	protected static final String PRIMARY_INDEX = "primary";
-	protected final Configuration conf;
-	protected final PregelixJob giraphJob;
-	protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
-	protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-	protected String jobId = new UUID(System.currentTimeMillis(),
-			System.nanoTime()).toString();
-	protected int frameSize = ClusterConfig.getFrameSize();
-	protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+    protected static final int MB = 1048576;
+    protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    protected static final int tableSize = 10485767;
+    protected static final String PRIMARY_INDEX = "primary";
+    protected final Configuration conf;
+    protected final PregelixJob giraphJob;
+    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+    protected int frameSize = ClusterConfig.getFrameSize();
+    protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
-	protected static final String SECONDARY_INDEX_ODD = "secondary1";
-	protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+    protected static final String SECONDARY_INDEX_ODD = "secondary1";
+    protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-	public JobGen(PregelixJob job) {
-		this.conf = job.getConfiguration();
-		this.giraphJob = job;
-		this.initJobConfiguration();
-		job.setJobId(jobId);
-		
-		//set the frame size to be the one user specified if the user did specify.
-		int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
-		if (specifiedFrameSize > 0) {
-			frameSize = specifiedFrameSize;
-			maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
-		}
-		if (maxFrameNumber <= 0) {
-			maxFrameNumber = 1;
-		}
-	}
+    public JobGen(PregelixJob job) {
+        this.conf = job.getConfiguration();
+        this.giraphJob = job;
+        this.initJobConfiguration();
+        job.setJobId(jobId);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	private void initJobConfiguration() {
-		Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
-				Vertex.class);
-		List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
-				Vertex.class, vertexClass);
-		Type vertexIndexType = parameterTypes.get(0);
-		Type vertexValueType = parameterTypes.get(1);
-		Type edgeValueType = parameterTypes.get(2);
-		Type messageValueType = parameterTypes.get(3);
-		conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
-				(Class<?>) vertexIndexType, WritableComparable.class);
-		conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
-				(Class<?>) vertexValueType, Writable.class);
-		conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
-				Writable.class);
-		conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
-				(Class<?>) messageValueType, Writable.class);
+        // set the frame size to be the one user specified if the user did
+        // specify.
+        int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+        if (specifiedFrameSize > 0) {
+            frameSize = specifiedFrameSize;
+            maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+        }
+        if (maxFrameNumber <= 0) {
+            maxFrameNumber = 1;
+        }
+    }
 
-		Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
-		if (!aggregatorClass.equals(GlobalAggregator.class)) {
-			List<Type> argTypes = ReflectionUtils.getTypeArguments(
-					GlobalAggregator.class, aggregatorClass);
-			Type partialAggregateValueType = argTypes.get(4);
-			conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
-					(Class<?>) partialAggregateValueType, Writable.class);
-			Type finalAggregateValueType = argTypes.get(5);
-			conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
-					(Class<?>) finalAggregateValueType, Writable.class);
-		}
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void initJobConfiguration() {
+        Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+        List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+        Type vertexIndexType = parameterTypes.get(0);
+        Type vertexValueType = parameterTypes.get(1);
+        Type edgeValueType = parameterTypes.get(2);
+        Type messageValueType = parameterTypes.get(3);
+        conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+        conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+        conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+        conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
 
-		Class combinerClass = BspUtils.getMessageCombinerClass(conf);
-		if (!combinerClass.equals(MessageCombiner.class)) {
-			List<Type> argTypes = ReflectionUtils.getTypeArguments(
-					MessageCombiner.class, combinerClass);
-			Type partialCombineValueType = argTypes.get(2);
-			conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
-					(Class<?>) partialCombineValueType, Writable.class);
-		}
-	}
+        Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+        if (!aggregatorClass.equals(GlobalAggregator.class)) {
+            List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+            Type partialAggregateValueType = argTypes.get(4);
+            conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+                    Writable.class);
+            Type finalAggregateValueType = argTypes.get(5);
+            conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
+        }
 
-	public String getJobId() {
-		return jobId;
-	}
+        Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+        if (!combinerClass.equals(MessageCombiner.class)) {
+            List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+            Type partialCombineValueType = argTypes.get(2);
+            conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+        }
+    }
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public JobSpecification generateCreatingJob() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		JobSpecification spec = new JobSpecification();
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
+    public String getJobId() {
+        return jobId;
+    }
 
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
-		TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider, typeTraits, comparatorFactories,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, btreeCreate);
-		return spec;
-	}
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateCreatingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public JobSpecification generateLoadingJob() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        return spec;
+    }
 
-		/**
-		 * the graph file scan operator and use count constraint first, will use
-		 * absolute constraint later
-		 */
-		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-		List<InputSplit> splits = new ArrayList<InputSplit>();
-		try {
-			splits = inputFormat.getSplits(giraphJob,
-					fileSplitProvider.getFileSplits().length);
-			LOGGER.info("number of splits: " + splits.size());
-			for (InputSplit split : splits)
-				LOGGER.info(split.toString());
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
-				spec, recordDescriptor, splits, confFactory);
-		ClusterConfig.setLocationConstraint(spec, scanner, splits);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public JobSpecification generateLoadingJob() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct sort operator
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-				.getAscINormalizedKeyComputerFactory(vertexIdClass);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec, maxFrameNumber, sortFields, nkmFactory,
-				comparatorFactories, recordDescriptor);
-		ClusterConfig.setLocationConstraint(spec, sorter);
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            LOGGER.info("number of splits: " + splits.size());
+            for (InputSplit split : splits)
+                LOGGER.info(split.toString());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * construct tree bulk load operator
-		 */
-		int[] fieldPermutation = new int[2];
-		fieldPermutation[0] = 0;
-		fieldPermutation[1] = 1;
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider, typeTraits, comparatorFactories,
-				fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
 
-		/**
-		 * connect operator descriptors
-		 */
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new MToNPartitioningConnectorDescriptor(spec,
-				hashPartitionComputerFactory), scanner, 0, sorter, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				btreeBulkLoad, 0);
-		return spec;
-	}
+        /**
+         * construct tree bulk load operator
+         */
+        int[] fieldPermutation = new int[2];
+        fieldPermutation[0] = 0;
+        fieldPermutation[1] = 1;
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
-	@Override
-	public JobSpecification generateJob(int iteration) throws HyracksException {
-		if (iteration <= 0)
-			throw new IllegalStateException(
-					"iteration number cannot be less than 1");
-		if (iteration == 1)
-			return generateFirstIteration(iteration);
-		else
-			return generateNonFirstIteration(iteration);
-	}
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+        return spec;
+    }
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanSortPrintGraph(String nodeName, String path)
-			throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
-		JobSpecification spec = new JobSpecification();
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+    @Override
+    public JobSpecification generateJob(int iteration) throws HyracksException {
+        if (iteration <= 0)
+            throw new IllegalStateException("iteration number cannot be less than 1");
+        if (iteration == 1)
+            return generateFirstIteration(iteration);
+        else
+            return generateNonFirstIteration(iteration);
+    }
 
-		/**
-		 * the graph file scan operator and use count constraint first, will use
-		 * absolute constraint later
-		 */
-		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-		List<InputSplit> splits = new ArrayList<InputSplit>();
-		try {
-			splits = inputFormat.getSplits(giraphJob,
-					fileSplitProvider.getFileSplits().length);
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
-				spec, recordDescriptor, splits, confFactory);
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
-				splits.size());
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct sort operator
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-				.getAscINormalizedKeyComputerFactory(vertexIdClass);
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec, maxFrameLimit, sortFields, nkmFactory,
-				comparatorFactories, recordDescriptor);
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
-				splits.size());
+        /**
+         * the graph file scan operator and use count constraint first, will use
+         * absolute constraint later
+         */
+        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        try {
+            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * construct write file operator
-		 */
-		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
-				new File(path)));
-		FileSplit[] results = new FileSplit[1];
-		results[0] = resultFile;
-		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
-				results);
-		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
-				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
-				null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
-				new String[] { "nc1" });
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+        /**
+         * construct sort operator
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+                .getAscINormalizedKeyComputerFactory(vertexIdClass);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+                nkmFactory, comparatorFactories, recordDescriptor);
+        ClusterConfig.setLocationConstraint(spec, sorter);
 
-		/**
-		 * connect operator descriptors
-		 */
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
-				0);
-		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
-				hashPartitionComputerFactory, sortFields, comparatorFactories),
-				sorter, 0, writer, 0);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanIndexPrintGraph(String nodeName, String path)
-			throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), sorter, 0, writer, 0);
+        return spec;
+    }
 
-		/**
-		 * construct empty tuple operator
-		 */
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
 
-		/**
-		 * construct btree search operator
-		 */
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
-				spec, recordDescriptor, storageManagerInterface,
-				treeRegistryProvider, fileSplitProvider, typeTraits,
-				comparatorFactories, null, null, true, true,
-				new BTreeDataflowHelperFactory(), false,
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, scanner);
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-		/**
-		 * construct write file operator
-		 */
-		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
-				new File(path)));
-		FileSplit[] results = new FileSplit[1];
-		results[0] = resultFile;
-		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
-				results);
-		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
-				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
-				null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
-				new String[] { "nc1" });
-		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * connect operator descriptors
-		 */
-		int[] sortFields = new int[1];
-		sortFields[0] = 0;
-		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
-				0, scanner, 0);
-		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
-				hashPartitionComputerFactory, sortFields, comparatorFactories),
-				scanner, 0, writer, 0);
-		spec.setFrameSize(frameSize);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+                resultFileSplitProvider, preHookFactory, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public JobSpecification scanIndexWriteGraph() throws HyracksException {
-		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
-				.getVertexIndexClass(conf);
-		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+                comparatorFactories), scanner, 0, writer, 0);
+        spec.setFrameSize(frameSize);
+        return spec;
+    }
 
-		/**
-		 * construct empty tuple operator
-		 */
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public JobSpecification scanIndexWriteGraph() throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
 
-		/**
-		 * construct btree search operator
-		 */
-		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-		RecordDescriptor recordDescriptor = DataflowUtils
-				.getRecordDescriptorFromKeyValueClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
-				WritableComparator.get(vertexIdClass).getClass());
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        /**
+         * construct empty tuple operator
+         */
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-		ITypeTraits[] typeTraits = new ITypeTraits[2];
-		typeTraits[0] = new TypeTraits(false);
-		typeTraits[1] = new TypeTraits(false);
-		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
-				spec, recordDescriptor, storageManagerInterface,
-				treeRegistryProvider, fileSplitProvider, typeTraits,
-				comparatorFactories, null, null, true, true,
-				new BTreeDataflowHelperFactory(), false,
-				NoOpOperationCallbackProvider.INSTANCE);
-		ClusterConfig.setLocationConstraint(spec, scanner);
+        /**
+         * construct btree search operator
+         */
+        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+                .getClass());
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-		/**
-		 * construct write file operator
-		 */
-		IRecordDescriptorFactory inputRdFactory = DataflowUtils
-				.getWritableRecordDescriptorFactoryFromWritableClasses(
-						vertexIdClass.getName(), vertexClass.getName());
-		HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
-				spec, confFactory, inputRdFactory);
-		ClusterConfig.setLocationConstraint(spec, writer);
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
-		/**
-		 * connect operator descriptors
-		 */
-		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
-				0, scanner, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
-				0);
-		return spec;
-	}
+        /**
+         * construct write file operator
+         */
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                vertexIdClass.getName(), vertexClass.getName());
+        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+        ClusterConfig.setLocationConstraint(spec, writer);
 
-	/***
-	 * drop the sindex
-	 * 
-	 * @return JobSpecification
-	 * @throws HyracksException
-	 */
-	protected JobSpecification dropIndex(String indexName)
-			throws HyracksException {
-		JobSpecification spec = new JobSpecification();
+        /**
+         * connect operator descriptors
+         */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+        return spec;
+    }
 
-		IFileSplitProvider fileSplitProvider = ClusterConfig
-				.getFileSplitProvider(jobId, indexName);
-		TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
-				spec, storageManagerInterface, treeRegistryProvider,
-				fileSplitProvider);
+    /***
+     * drop the sindex
+     * 
+     * @return JobSpecification
+     * @throws HyracksException
+     */
+    protected JobSpecification dropIndex(String indexName) throws HyracksException {
+        JobSpecification spec = new JobSpecification();
 
-		ClusterConfig.setLocationConstraint(spec, drop);
-		spec.addRoot(drop);
-		return spec;
-	}
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistryProvider, fileSplitProvider);
 
-	/** generate non-first iteration job */
-	protected abstract JobSpecification generateNonFirstIteration(int iteration)
-			throws HyracksException;
+        ClusterConfig.setLocationConstraint(spec, drop);
+        spec.addRoot(drop);
+        return spec;
+    }
 
-	/** generate first iteration job */
-	protected abstract JobSpecification generateFirstIteration(int iteration)
-			throws HyracksException;
+    /** generate non-first iteration job */
+    protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
 
-	/** generate clean-up job */
-	public abstract JobSpecification[] generateCleanup()
-			throws HyracksException;
+    /** generate first iteration job */
+    protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+    /** generate clean-up job */
+    public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
 
 public class ClusterConfig {
 
@@ -49,6 +50,7 @@
     private static Properties clusterProperties = new Properties();
     private static Map<String, List<String>> ipToNcMapping;
     private static String[] stores;
+    private static Scheduler hdfsScheduler;
 
     /**
      * let tests set config path to be whatever
@@ -211,6 +213,8 @@
                 NCs[i] = entry.getKey();
                 i++;
             }
+
+            hdfsScheduler = new Scheduler(ipAddress, port);
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -218,4 +222,8 @@
         loadClusterProperties();
         loadStores();
     }
+
+    public static Scheduler getHdfsScheduler() {
+        return hdfsScheduler;
+    }
 }
diff --git a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
+      <name>mapred.max.split.size</name>
+      <value>4096</value>
    </property>
 
 </configuration>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 36b0eb8..6c039bf 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -144,5 +144,12 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0736e1d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,6 +17,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -55,15 +56,20 @@
     private final List<InputSplit> splits;
     private final IConfigurationFactory confFactory;
     private final int fieldSize = 2;
+    private final String[] scheduledLocations;
+    private final boolean[] executed;
 
     /**
      * @param spec
      */
     public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
-            IConfigurationFactory confFactory) throws HyracksException {
+            String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
         super(spec, 0, 1);
         this.splits = splits;
         this.confFactory = confFactory;
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
         this.recordDescriptors[0] = rd;
     }
 
@@ -78,7 +84,21 @@
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     writer.open();
-                    loadVertices(ctx, partition);
+                    for (int i = 0; i < scheduledLocations.length; i++) {
+                        if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+                            /**
+                             * pick one from the FileSplit queue
+                             */
+                            synchronized (executed) {
+                                if (!executed[i]) {
+                                    executed[i] = true;
+                                } else {
+                                    continue;
+                                }
+                            }
+                            loadVertices(ctx, i);
+                        }
+                    }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
@@ -104,13 +124,13 @@
 
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
                 TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
-                InputSplit split = splits.get(partition);
+                InputSplit split = splits.get(partitionId);
 
                 if (split instanceof FileSplit) {
                     FileSplit fileSplit = (FileSplit) split;
                     LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
                             + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
-                            + partition);
+                            + partitionId);
                 }
                 VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
                 vertexReader.initialize(split, context);
@@ -122,7 +142,7 @@
                  * set context
                  */
                 Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(partition));
+                        splits.get(partitionId));
                 Vertex.setContext(mapperContext);
 
                 /**
@@ -166,5 +186,4 @@
             }
         };
     }
-
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..71450f1 100644
--- a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
+      <name>mapred.max.split.size</name>
+      <value>128</value>
    </property>
 
 </configuration>