fix the class loader issue for hdfs write

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2910 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e29848c..ff97a29 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -71,20 +71,24 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final JobConf conf = confFactory.getConf();
-        final String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputDirPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                JobConf conf = confFactory.getConf();
+                String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+                String fileName = outputDirPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf);
@@ -115,6 +119,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 32bb9dc..87c1c47 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -72,21 +72,25 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Job conf = confFactory.getConf();
-        final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
-                .toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                Job conf = confFactory.getConf();
+                String outputPath = FileOutputFormat
+                        .getOutputPath(new JobContext(conf.getConfiguration(), new JobID())).toString();
+                String fileName = outputPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
@@ -117,6 +121,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }