cleanup hdfs class loader code in pregelix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2914 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
             private FrameDeserializer frameDeserializer;
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final IFunction function = functionFactory.createFunction();
+            private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
 
             @Override
-            public void close() throws HyracksDataException {
-                if (postHookFactory != null)
-                    postHookFactory.createRuntimeHook().configure(ctx);
-                function.close();
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 for (IFrameWriter writer : writers) {
-                    writer.close();
+                    writer.open();
                 }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
-                }
+                if (preHookFactory != null)
+                    preHookFactory.createRuntimeHook().configure(ctx);
+                function.open(ctx, rd0, writers);
             }
 
             @Override
@@ -89,17 +88,21 @@
             }
 
             @Override
-            public void open() throws HyracksDataException {
-                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
-                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                function.close();
                 for (IFrameWriter writer : writers) {
-                    writer.open();
+                    writer.close();
                 }
-                if (preHookFactory != null)
-                    preHookFactory.createRuntimeHook().configure(ctx);
-                function.open(ctx, rd0, writers);
+                Thread.currentThread().setContextClassLoader(ctxCL);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
             }
 
             @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
     private final IFrameWriter[] writers;
     private TupleDeserializer tupleDe;
     private RecordDescriptor inputRd;
+    private ClassLoader ctxCL;
 
     public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
     public void functionOpen() throws HyracksDataException {
         inputRd = inputRdFactory.createRecordDescriptor();
         tupleDe = new TupleDeserializer(inputRd);
+        ctxCL = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         for (IFrameWriter writer : writers) {
             writer.open();
@@ -124,5 +126,6 @@
         for (IFrameWriter writer : writers) {
             writer.close();
         }
+        Thread.currentThread().setContextClassLoader(ctxCL);
     }
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d76 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private RecordDescriptor rd0;
             private FrameDeserializer frameDeserializer;
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf;
             private VertexWriter vertexWriter;
             private TaskAttemptContext context;
             private String TEMP_DIR = "_temporary";
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
                         : inputRdFactory.createRecordDescriptor();
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                conf = confFactory.createConfiguration();
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
                 TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                Thread.currentThread().setContextClassLoader(ctxCL);
             }
 
             @Override
@@ -151,6 +154,8 @@
                     dfs.rename(srcFile, filePath);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..ee16c08 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -83,12 +83,14 @@
         final List<FileSplit> splits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private ClassLoader ctxCL;
 
             @Override
             public void initialize() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Configuration conf = confFactory.createConfiguration();
                     writer.open();
                     for (int i = 0; i < scheduledLocations.length; i++) {
                         if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
                                     continue;
                                 }
                             }
-                            loadVertices(ctx, i);
+                            loadVertices(ctx, conf, i);
                         }
                     }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
@@ -122,8 +126,9 @@
              * @throws InterruptedException
              */
             @SuppressWarnings("unchecked")
-            private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
-                    ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+            private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+                    throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+                    IllegalAccessException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);