Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2965 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index bbe8fb3..63a6852 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -54,6 +54,8 @@
         operatorVisitedToParents.clear();
         builder.buildSpec(rootOps);
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        // Do not do activity cluster planning because it is slow on large clusters
+        spec.setUseConnectorPolicyForScheduling(false);
         return spec;
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 5c5fdb1..a8864fe 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -38,7 +38,6 @@
         if (context.checkIfInDontApplySet(this, op)) {
             return false;
         }
-        context.addToDontApplySet(this, op);
         if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
             return false;
         }
@@ -86,6 +85,7 @@
         opRef3.setValue(newGbyOp);
         typeGby(newGbyOp, context);
         typeGby(gbyOp, context);
+    	context.addToDontApplySet(this, op);
         return true;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
     private final ISerializableTable table;
 	private final int tableSize;
     private final TuplePointer storedTuplePointer;
+    private final boolean reverseOutputOrder;	//Should we reverse the order of tuples, we are writing in output
     
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
             throws HyracksDataException {
+    	this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+    }
+    
+    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+            ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
     	this.tableSize = tableSize;
        	this.table = table;
        	storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
         } else {
             nullTupleBuild = null;
         }
+    	reverseOutputOrder = reverse;
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
                 int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                 if (c == 0) {
                     matchFound = true;
-                    if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                        flushFrame(outBuffer, writer);
-                        appender.reset(outBuffer, true);
-                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                            throw new IllegalStateException();
-                        }
-                    }
+                    appendToResult(i, tIndex, writer);
                 }
             } while (true);
 
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+                
+            	if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                     flushFrame(outBuffer, writer);
                     appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
                         throw new IllegalStateException();
                     }
                 }
+                
             }
         }
     }
@@ -145,23 +148,25 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
-
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 8;
-
-        long[] pointers;
-        int size;
-
-        Link() {
-            pointers = new long[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(long pointer) {
-            if (size >= pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+    
+    private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+    	if(!reverseOutputOrder){
+    		if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                    throw new IllegalStateException();
+                }
             }
-            pointers[size++] = pointer;
-        }
+    	}
+    	else{
+    		if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                    throw new IllegalStateException();
+                }
+            }
+    	}
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 19479df..cf39416 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
+        
 
     }
 
@@ -254,6 +256,7 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
             
+            
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
@@ -421,34 +424,40 @@
                     //Apply in-Mem HJ if possible
                     if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
                         int tabSize = -1;
-                        if (buildPartSize < probePartSize) {
+                        
+                        if (isLeftOuter || buildPartSize < probePartSize) {
                             tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+                           
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
-                            //Build Side is smaller
-                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
-                                    buildSideReader, probeSideReader);
+                          //Build Side is smaller
+                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+                                    buildSideReader, probeSideReader, false, pid);
 
-                        } else { //Role Reversal
+                        } 
+                        
+                        else { //Role Reversal
                             tabSize = ohhj.getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
                             //Probe Side is smaller
-                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
-                                    probeSideReader, buildSideReader);
+                            
+                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+                                    probeSideReader, buildSideReader, true, pid);
                         }
                     }
                     //Apply (Recursive) HHJ
                     else {
                         OptimizedHybridHashJoin rHHj;
-                        if (buildPartSize < probePartSize) { //Build Side is smaller
+                        if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
 
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
+                           
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
                                     probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
 
@@ -491,14 +500,14 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
 
                                     int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                                    if (buildSideInTups < probeSideInTups) {
+                                    if (isLeftOuter || buildSideInTups < probeSideInTups) {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
                                                 nljComparator0);
                                     } else {
@@ -510,6 +519,7 @@
                         } else { //Role Reversal (Probe Side is smaller)
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
+                            
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
                                     buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
 
@@ -548,7 +558,7 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
@@ -572,14 +582,14 @@
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
-                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
                         throws HyracksDataException {
 
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
                             buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table);
+                            isLeftOuter, nullWriters1, table, reverse);
 
                     bReader.open();
                     rPartbuff.clear();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
new file mode 100644
index 0000000..e74d610
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>hyracks-hdfs-0.20.2</artifactId>
+	<name>hyracks-hdfs-0.20.2</name>
+	<parent>
+		<artifactId>hyracks-hdfs</artifactId>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<activation>
+				<activeByDefault>true</activeByDefault>
+			</activation>
+			<id>hadoop-0.20.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+					<version>0.20.2</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>0.20.2</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+			</activation>
+			<id>hadoop-1.0.4</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+					<version>1.0.4</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>1.0.4</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>1.0.4</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+	<dependencies>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..a2b16c6
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+        try {
+            return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
new file mode 100644
index 0000000..9133d35
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MiniDFSClusterFactory {
+
+    public MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+        try {
+            return new MiniDFSCluster(conf, numberOfNC, true, null);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
new file mode 100644
index 0000000..27a1e33
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>hyracks-hdfs-0.23.1</artifactId>
+	<name>hyracks-hdfs-0.23.1</name>
+	<parent>
+		<artifactId>hyracks-hdfs</artifactId>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<activation>
+				<activeByDefault>true</activeByDefault>
+			</activation>
+			<id>hadoop-0.23.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>0.23.1</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>0.23.1</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>0.23.1</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>0.23.1</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop-0.23.6</id>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>0.23.6</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>0.23.6</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>0.23.6</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>0.23.6</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+	<dependencies>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..60ae5d3
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+        try {
+            return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
new file mode 100644
index 0000000..ded75f1
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MiniDFSClusterFactory {
+
+    public MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+        try {
+            MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+            builder.numDataNodes(numberOfNC);
+            MiniDFSCluster dfsCluster = builder.build();
+            return dfsCluster;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
new file mode 100644
index 0000000..fccfec4
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>hyracks-hdfs-core</artifactId>
+	<name>hyracks-hdfs-core</name>
+	<parent>
+		<artifactId>hyracks-hdfs</artifactId>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+								<include>edu.uci.*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<activation>
+				<activeByDefault>true</activeByDefault>
+			</activation>
+			<id>hadoop-0.20.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.20.2</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+			</activation>
+			<id>hadoop-1.0.4</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.20.2</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+			</activation>
+			<id>hadoop-0.23.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+			</activation>
+			<id>hadoop-0.23.6</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-std</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-io</artifactId>
+			<version>1.3.2</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
similarity index 94%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index a0c821a3..e924650 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -89,7 +89,6 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
         final InputSplit[] inputSplits = splitsFactory.getSplits();
-        final JobConf conf = confFactory.getConf();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
@@ -97,7 +96,10 @@
             @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
+                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                    JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
                     InputFormat inputFormat = conf.getInputFormat();
@@ -133,6 +135,8 @@
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
         };
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
similarity index 89%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e29848c..ff97a29 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -71,20 +71,24 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final JobConf conf = confFactory.getConf();
-        final String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputDirPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                JobConf conf = confFactory.getConf();
+                String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+                String fileName = outputDirPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf);
@@ -115,6 +119,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
similarity index 83%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 8a1ba6d..90f5603 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -24,7 +24,6 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -37,12 +36,13 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
 /**
- * The HDFS file read operator using the Hadoop new API.
- * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * The HDFS file read operator using the Hadoop new API. To use this operator, a
+ * user need to provide an IKeyValueParserFactory implementation which convert
  * key-value pairs into tuples.
  */
 @SuppressWarnings("rawtypes")
@@ -63,12 +63,16 @@
      * @param rd
      *            the output record descriptor
      * @param conf
-     *            the Hadoop JobConf object, which contains the input format and the input paths
+     *            the Hadoop JobConf object, which contains the input format and
+     *            the input paths
      * @param splits
      *            the array of FileSplits (HDFS chunks).
      * @param scheduledLocations
-     *            the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
-     *            is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+     *            the node controller names to scan the FileSplits, which is an
+     *            one-to-one mapping. The String array is obtained from the
+     *            edu.cui
+     *            .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
+     *            (InputSplits[]).
      * @param tupleParserFactory
      *            the ITupleParserFactory implementation instance.
      * @throws HyracksException
@@ -97,21 +101,23 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Job conf = confFactory.getConf();
         final List<FileSplit> inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
+                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Job job = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
-                    InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
-                            conf.getConfiguration());
+                    InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
+                            job.getConfiguration());
                     int size = inputSplits.size();
                     for (int i = 0; i < size; i++) {
                         /**
@@ -119,8 +125,8 @@
                          */
                         if (scheduledLocations[i].equals(nodeName)) {
                             /**
-                             * pick an unread split to read
-                             * synchronize among simultaneous partitions in the same machine
+                             * pick an unread split to read synchronize among
+                             * simultaneous partitions in the same machine
                              */
                             synchronized (executed) {
                                 if (executed[i] == false) {
@@ -133,8 +139,8 @@
                             /**
                              * read the split
                              */
-                            TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
-                                    new TaskAttemptID());
+                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
+                                    inputSplits.get(i));
                             RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                             reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
@@ -146,6 +152,8 @@
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
         };
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
similarity index 89%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 32bb9dc..390a7b5 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -22,8 +22,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -72,21 +70,24 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Job conf = confFactory.getConf();
-        final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
-                .toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                Job conf = confFactory.getConf();
+                String outputPath = FileOutputFormat.getOutputPath(conf).toString();
+                String fileName = outputPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
@@ -117,6 +118,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
similarity index 76%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 0087307..4b8a278 100644
--- a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -41,23 +41,17 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -85,23 +79,17 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -136,23 +124,17 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -187,23 +169,17 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
similarity index 96%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 508ba07..9f77979 100644
--- a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -32,8 +32,6 @@
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -53,6 +51,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.MiniDFSClusterFactory;
 import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
 import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
@@ -78,6 +77,7 @@
     private static final String HYRACKS_APP_NAME = "DataflowTest";
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
     private MiniDFSCluster dfsCluster;
+    private MiniDFSClusterFactory dfsClusterFactory = new MiniDFSClusterFactory();
 
     private Job conf;
     private int numberOfNC = 2;
@@ -113,7 +113,7 @@
         FileSystem lfs = FileSystem.getLocal(new Configuration());
         lfs.delete(new Path("build"), true);
         System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf.getConfiguration(), numberOfNC, true, null);
+        dfsCluster = dfsClusterFactory.getMiniDFSCluster(conf.getConfiguration(), numberOfNC);
         FileSystem dfs = FileSystem.get(conf.getConfiguration());
         Path src = new Path(DATA_PATH);
         Path dest = new Path(HDFS_INPUT_PATH);
@@ -141,7 +141,7 @@
 
         Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
         InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(), conf.getConfiguration());
-        List<InputSplit> splits = inputFormat.getSplits(new JobContext(conf.getConfiguration(), new JobID()));
+        List<InputSplit> splits = inputFormat.getSplits(conf);
 
         String[] readSchedule = scheduler.getLocationConstraints(splits);
         JobSpecification jobSpec = new JobSpecification();
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
similarity index 76%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 79f0874..ea2af13 100644
--- a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -46,23 +46,17 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -90,23 +84,17 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -141,23 +129,17 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -192,23 +174,17 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.1").getAddress(), 5099)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.2").getAddress(), 5099)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.3").getAddress(), 5099)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.4").getAddress(), 5099)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.5").getAddress(), 5099)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
-                .getAddress(), 5098)));
+                .getByName("10.0.0.6").getAddress(), 5099)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks/hyracks-hdfs/src/test/resources/data/customer.tbl b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/data/customer.tbl
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/data/customer.tbl
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/data/customer.tbl
diff --git a/hyracks/hyracks-hdfs/src/test/resources/expected/part-0 b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/expected/part-0
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/expected/part-0
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/expected/part-0
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/core-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/core-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/core-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/core-site.xml
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/hdfs-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/hdfs-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/log4j.properties b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/log4j.properties
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
diff --git a/hyracks/hyracks-hdfs/pom.xml b/hyracks/hyracks-hdfs/pom.xml
index b80dbd5..5ed76e9 100644
--- a/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks/hyracks-hdfs/pom.xml
@@ -1,118 +1,19 @@
-<?xml version="1.0"?>
-<project
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<artifactId>hyracks</artifactId>
-		<groupId>edu.uci.ics.hyracks</groupId>
-		<version>0.2.3-SNAPSHOT</version>
-	</parent>
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-hdfs</artifactId>
+  <packaging>pom</packaging>
+  <name>hyracks-hdfs</name>
 
-	<artifactId>hyracks-hdfs</artifactId>
-	<name>hyracks-hdfs</name>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+  </parent>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>2.0.2</version>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.7.2</version>
-				<configuration>
-					<forkMode>pertest</forkMode>
-					<includes>
-						<include>**/*TestSuite.java</include>
-						<include>**/*Test.java</include>
-					</includes>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
-				<configuration>
-					<filesets>
-						<fileset>
-							<directory>.</directory>
-							<includes>
-								<include>edu*</include>
-								<include>actual*</include>
-								<include>build*</include>
-								<include>expect*</include>
-								<include>ClusterController*</include>
-								<include>edu.uci.*</include>
-							</includes>
-						</fileset>
-					</filesets>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-	<dependencies>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>3.8.1</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-api</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-std</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-common</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-control-cc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-control-nc</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-test</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-io</artifactId>
-			<version>1.3.2</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
+  <modules>
+    <module>hyracks-hdfs-0.20.2</module>
+    <module>hyracks-hdfs-0.23.1</module>
+    <module>hyracks-hdfs-core</module>
+  </modules>
 </project>
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index dabc8a4..8fa99be 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -58,6 +58,7 @@
 
     ChannelControlBlock allocateChannel() throws NetException {
         synchronized (mConn) {
+       	    cleanupClosedChannels();
             int idx = allocationBitmap.nextClearBit(0);
             if (idx < 0 || idx >= ccbArray.length) {
                 cleanupClosedChannels();
@@ -231,4 +232,4 @@
             ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 2caa93b..0643804 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -77,11 +77,11 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
+                        <groupId>edu.uci.ics.hyracks</groupId>
+                        <artifactId>hyracks-hdfs-core</artifactId>
+                        <version>0.2.3-SNAPSHOT</version>
+                        <type>jar</type>
+                        <scope>compile</scope>
+                </dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
     /** List of incoming messages from the previous superstep */
     private final List<M> msgList = new ArrayList<M>();
     /** map context */
-    private static Mapper.Context context = null;
+    private static TaskAttemptContext context = null;
     /** a delegate for hyracks stuff */
     private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
     /** this vertex is updated or not */
@@ -444,8 +444,10 @@
     /**
      * Add a new vertex into the graph
      * 
-     * @param vertexId the vertex id
-     * @param vertex the vertex
+     * @param vertexId
+     *            the vertex id
+     * @param vertex
+     *            the vertex
      */
     public final void addVertex(I vertexId, V vertex) {
         delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
     /**
      * Delete a vertex from id
      * 
-     * @param vertexId  the vertex id
+     * @param vertexId
+     *            the vertex id
      */
     public final void deleteVertex(I vertexId) {
         delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
     /**
      * Pregelix internal use only
      */
-    public static final Mapper<?, ?, ?, ?>.Context getContext() {
+    public static final TaskAttemptContext getContext() {
         return context;
     }
 
@@ -537,7 +540,7 @@
      * 
      * @param context
      */
-    public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+    public static final void setContext(TaskAttemptContext context) {
         Vertex.context = context;
     }
 
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 34faa82..0908829 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -244,11 +244,6 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -281,13 +276,6 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-test</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>com.kenai.nbpwr</groupId>
 			<artifactId>org-apache-commons-io</artifactId>
 			<version>1.3.1-201002241208</version>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 1b6f195..f07a246 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -72,22 +72,23 @@
     public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
             throws HyracksException {
         applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
-        /** add hadoop configurations */
-        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-        job.getConfiguration().addResource(hadoopCore);
-        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-        job.getConfiguration().addResource(hadoopMapRed);
-        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-        job.getConfiguration().addResource(hadoopHdfs);
-        ClusterConfig.loadClusterConfig(ipAddress, port);
-
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
-
-        this.profiling = profiling;
         try {
+            /** add hadoop configurations */
+            URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+            job.getConfiguration().addResource(hadoopCore);
+            URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+            job.getConfiguration().addResource(hadoopMapRed);
+            URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+            job.getConfiguration().addResource(hadoopHdfs);
+            ClusterConfig.loadClusterConfig(ipAddress, port);
+
+            LOG.info("job started");
+            long start = System.currentTimeMillis();
+            long end = start;
+            long time = 0;
+
+            this.profiling = profiling;
+
             switch (planChoice) {
                 case INNER_JOIN:
                     jobGen = new JobGenInnerJoin(job);
@@ -146,6 +147,16 @@
             LOG.info("result writing finished " + time + "ms");
             LOG.info("job finished");
         } catch (Exception e) {
+            try {
+                /**
+                 * destroy application if there is any exception
+                 */
+                if (hcc != null) {
+                    destroyApplication(applicationName);
+                }
+            } catch (Exception e2) {
+                throw new HyracksException(e2);
+            }
             throw new HyracksException(e);
         }
     }
@@ -224,8 +235,8 @@
         LOG.info("jar deployment finished " + (end - start) + "ms");
     }
 
-    public void destroyApplication(String jarFile) throws Exception {
-        hcc.destroyApplication(applicationName);
+    public void destroyApplication(String appName) throws Exception {
+        hcc.destroyApplication(appName);
     }
 
 }
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 6c039bf..3f3ba00 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow-std</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow-std</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -101,11 +102,13 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -144,12 +147,5 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-hdfs</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
             private FrameDeserializer frameDeserializer;
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final IFunction function = functionFactory.createFunction();
+            private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
 
             @Override
-            public void close() throws HyracksDataException {
-                if (postHookFactory != null)
-                    postHookFactory.createRuntimeHook().configure(ctx);
-                function.close();
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 for (IFrameWriter writer : writers) {
-                    writer.close();
+                    writer.open();
                 }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
-                }
+                if (preHookFactory != null)
+                    preHookFactory.createRuntimeHook().configure(ctx);
+                function.open(ctx, rd0, writers);
             }
 
             @Override
@@ -89,17 +88,21 @@
             }
 
             @Override
-            public void open() throws HyracksDataException {
-                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
-                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                function.close();
                 for (IFrameWriter writer : writers) {
-                    writer.open();
+                    writer.close();
                 }
-                if (preHookFactory != null)
-                    preHookFactory.createRuntimeHook().configure(ctx);
-                function.open(ctx, rd0, writers);
+                Thread.currentThread().setContextClassLoader(ctxCL);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
             }
 
             @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
     private final IFrameWriter[] writers;
     private TupleDeserializer tupleDe;
     private RecordDescriptor inputRd;
+    private ClassLoader ctxCL;
 
     public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
     public void functionOpen() throws HyracksDataException {
         inputRd = inputRdFactory.createRecordDescriptor();
         tupleDe = new TupleDeserializer(inputRd);
+        ctxCL = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
@@ -124,5 +126,6 @@
         for (IFrameWriter writer : writers) {
             writer.close();
         }
+        Thread.currentThread().setContextClassLoader(ctxCL);
     }
 }
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 31b6adc..107f059 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -103,13 +104,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
@@ -144,12 +138,5 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-hdfs</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d761 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private RecordDescriptor rd0;
             private FrameDeserializer frameDeserializer;
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf;
             private VertexWriter vertexWriter;
             private TaskAttemptContext context;
             private String TEMP_DIR = "_temporary";
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
                         : inputRdFactory.createRecordDescriptor();
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                conf = confFactory.createConfiguration();
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
                 TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                Thread.currentThread().setContextClassLoader(ctxCL);
             }
 
             @Override
@@ -151,6 +154,8 @@
                     dfs.rename(srcFile, filePath);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -25,8 +25,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -43,6 +41,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -83,12 +82,15 @@
         final List<FileSplit> splits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void initialize() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Configuration conf = confFactory.createConfiguration();
                     writer.open();
                     for (int i = 0; i < scheduledLocations.length; i++) {
                         if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
                                     continue;
                                 }
                             }
-                            loadVertices(ctx, i);
+                            loadVertices(ctx, conf, i);
                         }
                     }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
@@ -122,8 +126,9 @@
              * @throws InterruptedException
              */
             @SuppressWarnings("unchecked")
-            private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
-                    ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+            private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+                    throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+                    IllegalAccessException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
@@ -141,8 +146,7 @@
                 /**
                  * set context
                  */
-                Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(splitId));
+                TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
                 Vertex.setContext(mapperContext);
 
                 /**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 02e1625..b6d4da7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -54,6 +54,7 @@
     public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
     private DoubleWritable outputValue = new DoubleWritable();
     private DoubleWritable tmpVertexValue = new DoubleWritable();
+    private int maxIteration = -1;
 
     /**
      * Test whether combiner is called by summing up the messages.
@@ -97,7 +98,9 @@
 
     @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
-        int maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+        if (maxIteration < 0) {
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+        }
         if (getSuperstep() == 1) {
             tmpVertexValue.set(1.0 / getNumVertices());
             setVertexValue(tmpVertexValue);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index daafc82..0895386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -85,6 +85,7 @@
     }
 
     private ByteWritable tmpVertexValue = new ByteWritable();
+    private long sourceId = -1;
 
     /** The source vertex id */
     public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
@@ -101,7 +102,7 @@
      * @return True if the source id
      */
     private boolean isSource(VLongWritable v) {
-        return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+        return (v.get() == sourceId);
     }
 
     /**
@@ -115,6 +116,9 @@
 
     @Override
     public void compute(Iterator<ByteWritable> msgIterator) {
+        if (sourceId < 0) {
+            sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+        }
         if (getSuperstep() == 1) {
             boolean isSource = isSource(getVertexId());
             if (isSource) {
@@ -160,7 +164,7 @@
         }
         voteToHalt();
     }
-    
+
     @Override
     public String toString() {
         return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 7a5bba6..5a556fa 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -38,7 +38,7 @@
 
 public class RunJobTestCase extends TestCase {
     private static final String NC1 = "nc1";
-    private static final String HYRACKS_APP_NAME = "giraph";
+    private static final String HYRACKS_APP_NAME = "pregelix";
     private static String HDFS_INPUTPATH = "/webmap";
     private static String HDFS_OUTPUTPAH = "/result";
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index b5abd94..79a5c3c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,168 +41,176 @@
 
 @SuppressWarnings("deprecation")
 public class RunJobTestSuite extends TestSuite {
-    private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
+	private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
+			.getName());
 
-    private static final String ACTUAL_RESULT_DIR = "actual";
-    private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
-    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
-    private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
-    private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
-    private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
-    private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
-    private static final String FILE_EXTENSION_OF_RESULTS = "result";
+	private static final String ACTUAL_RESULT_DIR = "actual";
+	private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+	private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+	private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+	private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+	private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+	private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-    private static final String DATA_PATH = "data/webmap/webmap_link.txt";
-    private static final String HDFS_PATH = "/webmap/";
+	private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+	private static final String HDFS_PATH = "/webmap/";
 
-    private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
-    private static final String HDFS_PATH2 = "/webmapcomplex/";
+	private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+	private static final String HDFS_PATH2 = "/webmapcomplex/";
 
-    private static final String DATA_PATH3 = "data/clique/clique.txt";
-    private static final String HDFS_PATH3 = "/clique/";
+	private static final String DATA_PATH3 = "data/clique/clique.txt";
+	private static final String HDFS_PATH3 = "/clique/";
 
-    private static final String HYRACKS_APP_NAME = "giraph";
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private MiniDFSCluster dfsCluster;
+	private static final String HYRACKS_APP_NAME = "pregelix";
+	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+			+ File.separator + "conf.xml";
+	private MiniDFSCluster dfsCluster;
 
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
+	private JobConf conf = new JobConf();
+	private int numberOfNC = 2;
 
-    public void setUp() throws Exception {
-        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
-        ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
-        cleanupStores();
-        PregelixHyracksIntegrationUtil.init();
-        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
-        LOGGER.info("Hyracks mini-cluster started");
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
-    }
+	public void setUp() throws Exception {
+		ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+		ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+		cleanupStores();
+		PregelixHyracksIntegrationUtil.init();
+		PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+		LOGGER.info("Hyracks mini-cluster started");
+		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+		startHDFS();
+	}
 
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
+	private void cleanupStores() throws IOException {
+		FileUtils.forceMkdir(new File("teststore"));
+		FileUtils.forceMkdir(new File("build"));
+		FileUtils.cleanDirectory(new File("teststore"));
+		FileUtils.cleanDirectory(new File("build"));
+	}
 
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_PATH);
-        Path dest = new Path(HDFS_PATH);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+	private void startHDFS() throws IOException {
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		lfs.delete(new Path("build"), true);
+		System.setProperty("hadoop.log.dir", "logs");
+		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+		FileSystem dfs = FileSystem.get(conf);
+		Path src = new Path(DATA_PATH);
+		Path dest = new Path(HDFS_PATH);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        src = new Path(DATA_PATH2);
-        dest = new Path(HDFS_PATH2);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+		src = new Path(DATA_PATH2);
+		dest = new Path(HDFS_PATH2);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        src = new Path(DATA_PATH3);
-        dest = new Path(HDFS_PATH3);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+		src = new Path(DATA_PATH3);
+		dest = new Path(HDFS_PATH3);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        conf.writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
+		DataOutputStream confOutput = new DataOutputStream(
+				new FileOutputStream(new File(HADOOP_CONF_PATH)));
+		conf.writeXml(confOutput);
+		confOutput.flush();
+		confOutput.close();
+	}
 
-    /**
-     * cleanup hdfs cluster
-     */
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
+	/**
+	 * cleanup hdfs cluster
+	 */
+	private void cleanupHDFS() throws Exception {
+		dfsCluster.shutdown();
+	}
 
-    public void tearDown() throws Exception {
-        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
-        PregelixHyracksIntegrationUtil.deinit();
-        LOGGER.info("Hyracks mini-cluster shut down");
-        cleanupHDFS();
-    }
+	public void tearDown() throws Exception {
+		PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+		PregelixHyracksIntegrationUtil.deinit();
+		LOGGER.info("Hyracks mini-cluster shut down");
+		cleanupHDFS();
+	}
 
-    public static Test suite() throws Exception {
-        List<String> ignores = getFileList(PATH_TO_IGNORE);
-        List<String> onlys = getFileList(PATH_TO_ONLY);
-        File testData = new File(PATH_TO_JOBS);
-        File[] queries = testData.listFiles();
-        RunJobTestSuite testSuite = new RunJobTestSuite();
-        testSuite.setUp();
-        boolean onlyEnabled = false;
+	public static Test suite() throws Exception {
+		List<String> ignores = getFileList(PATH_TO_IGNORE);
+		List<String> onlys = getFileList(PATH_TO_ONLY);
+		File testData = new File(PATH_TO_JOBS);
+		File[] queries = testData.listFiles();
+		RunJobTestSuite testSuite = new RunJobTestSuite();
+		testSuite.setUp();
+		boolean onlyEnabled = false;
 
-        if (onlys.size() > 0) {
-            onlyEnabled = true;
-        }
-        for (File qFile : queries) {
-            if (isInList(ignores, qFile.getName()))
-                continue;
+		if (onlys.size() > 0) {
+			onlyEnabled = true;
+		}
+		for (File qFile : queries) {
+			if (isInList(ignores, qFile.getName()))
+				continue;
 
-            if (qFile.isFile()) {
-                if (onlyEnabled && !isInList(onlys, qFile.getName())) {
-                    continue;
-                } else {
-                    String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
-                    String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
-                    testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
-                            .toString(), resultFileName, expectedFileName));
-                }
-            }
-        }
-        return testSuite;
-    }
+			if (qFile.isFile()) {
+				if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+					continue;
+				} else {
+					String resultFileName = ACTUAL_RESULT_DIR + File.separator
+							+ jobExtToResExt(qFile.getName());
+					String expectedFileName = EXPECTED_RESULT_DIR
+							+ File.separator + jobExtToResExt(qFile.getName());
+					testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
+							qFile.getName(),
+							qFile.getAbsolutePath().toString(), resultFileName,
+							expectedFileName));
+				}
+			}
+		}
+		return testSuite;
+	}
 
-    /**
-     * Runs the tests and collects their result in a TestResult.
-     */
-    @Override
-    public void run(TestResult result) {
-        try {
-            int testCount = countTestCases();
-            for (int i = 0; i < testCount; i++) {
-                // cleanupStores();
-                Test each = this.testAt(i);
-                if (result.shouldStop())
-                    break;
-                runTest(each, result);
-            }
-            tearDown();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
+	/**
+	 * Runs the tests and collects their result in a TestResult.
+	 */
+	@Override
+	public void run(TestResult result) {
+		try {
+			int testCount = countTestCases();
+			for (int i = 0; i < testCount; i++) {
+				// cleanupStores();
+				Test each = this.testAt(i);
+				if (result.shouldStop())
+					break;
+				runTest(each, result);
+			}
+			tearDown();
+		} catch (Exception e) {
+			throw new IllegalStateException(e);
+		}
+	}
 
-    protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
-        BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
-        String s = null;
-        List<String> ignores = new ArrayList<String>();
-        while ((s = reader.readLine()) != null) {
-            ignores.add(s);
-        }
-        reader.close();
-        return ignores;
-    }
+	protected static List<String> getFileList(String ignorePath)
+			throws FileNotFoundException, IOException {
+		BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+		String s = null;
+		List<String> ignores = new ArrayList<String>();
+		while ((s = reader.readLine()) != null) {
+			ignores.add(s);
+		}
+		reader.close();
+		return ignores;
+	}
 
-    private static String jobExtToResExt(String fname) {
-        int dot = fname.lastIndexOf('.');
-        return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
-    }
+	private static String jobExtToResExt(String fname) {
+		int dot = fname.lastIndexOf('.');
+		return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+	}
 
-    private static boolean isInList(List<String> onlys, String name) {
-        for (String only : onlys)
-            if (name.indexOf(only) >= 0)
-                return true;
-        return false;
-    }
+	private static boolean isInList(List<String> onlys, String name) {
+		for (String only : onlys)
+			if (name.indexOf(only) >= 0)
+				return true;
+		return false;
+	}
 
 }
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 1c414ff..82be6b9 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -117,13 +117,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
index e7caeaf..76c725e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -34,7 +34,7 @@
 
     @Override
     public void stop() throws Exception {
-        LOGGER.info("Stopping Giraph NC Bootstrap");
+        LOGGER.info("Stopping NC Bootstrap");
         RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
         rCtx.close();
     }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 1b8fce4..a0dca3d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -104,11 +104,13 @@
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -241,8 +243,8 @@
             public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
-                            //in-place update
+                        if (!dynamicStateLength) {
+                            // in-place update
                             int fieldCount = tupleRef.getFieldCount();
                             for (int i = 1; i < fieldCount; i++) {
                                 byte[] data = tupleRef.getFieldData(i);
@@ -251,12 +253,12 @@
                                 vertex.write(output);
                             }
                         } else {
-                            //write the vertex id
+                            // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
                             vertex.getVertexId().write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
 
-                            //write the vertex value
+                            // write the vertex value
                             vertex.write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
                         }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a4d54c8..3d8a355 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -79,7 +79,7 @@
             private ByteBuffer bufferGlobalAggregate;
             private GlobalAggregator aggregator;
 
-            //for writing out the global aggregate
+            // for writing out the global aggregate
             private IFrameWriter writerTerminate;
             private FrameTupleAppender appenderTerminate;
             private ByteBuffer bufferTerminate;
@@ -107,11 +107,13 @@
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -215,7 +217,8 @@
             private void writeOutGlobalAggregate() throws HyracksDataException {
                 try {
                     /**
-                     * get partial aggregate result and flush to the final aggregator
+                     * get partial aggregate result and flush to the final
+                     * aggregator
                      */
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
@@ -244,8 +247,8 @@
             public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
-                            //in-place update
+                        if (!dynamicStateLength) {
+                            // in-place update
                             int fieldCount = tupleRef.getFieldCount();
                             for (int i = 1; i < fieldCount; i++) {
                                 byte[] data = tupleRef.getFieldData(i);
@@ -254,12 +257,12 @@
                                 vertex.write(output);
                             }
                         } else {
-                            //write the vertex id
+                            // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
                             vertex.getVertexId().write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
 
-                            //write the vertex value
+                            // write the vertex value
                             vertex.write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
                         }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
     public IRuntimeHook createRuntimeHook() {
 
         return new IRuntimeHook() {
+            private ContextFactory ctxFactory = new ContextFactory();
 
-            @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration();
                 try {
-                    Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                            null);
+                    TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
                     Vertex.setContext(mapperContext);
                     BspUtils.setDefaultConfiguration(conf);
                 } catch (Exception e) {