svn merge -r2910:2939  


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2940 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 2caa93b..0643804 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -77,11 +77,11 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
+                        <groupId>edu.uci.ics.hyracks</groupId>
+                        <artifactId>hyracks-hdfs-core</artifactId>
+                        <version>0.2.3-SNAPSHOT</version>
+                        <type>jar</type>
+                        <scope>compile</scope>
+                </dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
     /** List of incoming messages from the previous superstep */
     private final List<M> msgList = new ArrayList<M>();
     /** map context */
-    private static Mapper.Context context = null;
+    private static TaskAttemptContext context = null;
     /** a delegate for hyracks stuff */
     private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
     /** this vertex is updated or not */
@@ -444,8 +444,10 @@
     /**
      * Add a new vertex into the graph
      * 
-     * @param vertexId the vertex id
-     * @param vertex the vertex
+     * @param vertexId
+     *            the vertex id
+     * @param vertex
+     *            the vertex
      */
     public final void addVertex(I vertexId, V vertex) {
         delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
     /**
      * Delete a vertex from id
      * 
-     * @param vertexId  the vertex id
+     * @param vertexId
+     *            the vertex id
      */
     public final void deleteVertex(I vertexId) {
         delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
     /**
      * Pregelix internal use only
      */
-    public static final Mapper<?, ?, ?, ?>.Context getContext() {
+    public static final TaskAttemptContext getContext() {
         return context;
     }
 
@@ -537,7 +540,7 @@
      * 
      * @param context
      */
-    public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+    public static final void setContext(TaskAttemptContext context) {
         Vertex.context = context;
     }
 
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 34faa82..0908829 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -244,11 +244,6 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -281,13 +276,6 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-test</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>com.kenai.nbpwr</groupId>
 			<artifactId>org-apache-commons-io</artifactId>
 			<version>1.3.1-201002241208</version>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 6c039bf..3f3ba00 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow-std</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow-std</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -101,11 +102,13 @@
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-data-std</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
@@ -144,12 +147,5 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-hdfs</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
             private FrameDeserializer frameDeserializer;
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final IFunction function = functionFactory.createFunction();
+            private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
 
             @Override
-            public void close() throws HyracksDataException {
-                if (postHookFactory != null)
-                    postHookFactory.createRuntimeHook().configure(ctx);
-                function.close();
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 for (IFrameWriter writer : writers) {
-                    writer.close();
+                    writer.open();
                 }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
-                }
+                if (preHookFactory != null)
+                    preHookFactory.createRuntimeHook().configure(ctx);
+                function.open(ctx, rd0, writers);
             }
 
             @Override
@@ -89,17 +88,21 @@
             }
 
             @Override
-            public void open() throws HyracksDataException {
-                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
-                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                function.close();
                 for (IFrameWriter writer : writers) {
-                    writer.open();
+                    writer.close();
                 }
-                if (preHookFactory != null)
-                    preHookFactory.createRuntimeHook().configure(ctx);
-                function.open(ctx, rd0, writers);
+                Thread.currentThread().setContextClassLoader(ctxCL);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
             }
 
             @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
     private final IFrameWriter[] writers;
     private TupleDeserializer tupleDe;
     private RecordDescriptor inputRd;
+    private ClassLoader ctxCL;
 
     public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
     public void functionOpen() throws HyracksDataException {
         inputRd = inputRdFactory.createRecordDescriptor();
         tupleDe = new TupleDeserializer(inputRd);
+        ctxCL = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
@@ -124,5 +126,6 @@
         for (IFrameWriter writer : writers) {
             writer.close();
         }
+        Thread.currentThread().setContextClassLoader(ctxCL);
     }
 }
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 31b6adc..107f059 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -103,13 +104,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
@@ -144,12 +138,5 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
-		<dependency>
-			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-hdfs</artifactId>
-			<version>0.2.3-SNAPSHOT</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d76 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private RecordDescriptor rd0;
             private FrameDeserializer frameDeserializer;
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf;
             private VertexWriter vertexWriter;
             private TaskAttemptContext context;
             private String TEMP_DIR = "_temporary";
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
                         : inputRdFactory.createRecordDescriptor();
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                conf = confFactory.createConfiguration();
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
                 TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                Thread.currentThread().setContextClassLoader(ctxCL);
             }
 
             @Override
@@ -151,6 +154,8 @@
                     dfs.rename(srcFile, filePath);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -25,8 +25,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -43,6 +41,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -83,12 +82,15 @@
         final List<FileSplit> splits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void initialize() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Configuration conf = confFactory.createConfiguration();
                     writer.open();
                     for (int i = 0; i < scheduledLocations.length; i++) {
                         if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
                                     continue;
                                 }
                             }
-                            loadVertices(ctx, i);
+                            loadVertices(ctx, conf, i);
                         }
                     }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
@@ -122,8 +126,9 @@
              * @throws InterruptedException
              */
             @SuppressWarnings("unchecked")
-            private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
-                    ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+            private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+                    throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+                    IllegalAccessException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
@@ -141,8 +146,7 @@
                 /**
                  * set context
                  */
-                Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(splitId));
+                TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
                 Vertex.setContext(mapperContext);
 
                 /**
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 1c414ff..82be6b9 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -117,13 +117,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
     public IRuntimeHook createRuntimeHook() {
 
         return new IRuntimeHook() {
+            private ContextFactory ctxFactory = new ContextFactory();
 
-            @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration();
                 try {
-                    Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                            null);
+                    TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
                     Vertex.setContext(mapperContext);
                     BspUtils.setDefaultConfiguration(conf);
                 } catch (Exception e) {