fix the class loader issue for hdfs write
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2910 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e29848c..ff97a29 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -71,20 +71,24 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final JobConf conf = confFactory.getConf();
- final String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private String fileName = outputDirPath + File.separator + "part-" + partition;
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ JobConf conf = confFactory.getConf();
+ String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+ String fileName = outputDirPath + File.separator + "part-" + partition;
+
tupleWriter = tupleWriterFactory.getTupleWriter();
try {
FileSystem dfs = FileSystem.get(conf);
@@ -115,6 +119,8 @@
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 32bb9dc..87c1c47 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -72,21 +72,25 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final Job conf = confFactory.getConf();
- final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
- .toString();
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private String fileName = outputPath + File.separator + "part-" + partition;
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Job conf = confFactory.getConf();
+ String outputPath = FileOutputFormat
+ .getOutputPath(new JobContext(conf.getConfiguration(), new JobID())).toString();
+ String fileName = outputPath + File.separator + "part-" + partition;
+
tupleWriter = tupleWriterFactory.getTupleWriter();
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
@@ -117,6 +121,8 @@
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}