cleanup hdfs class loader code in pregelix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2914 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
private FrameDeserializer frameDeserializer;
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final IFunction function = functionFactory.createFunction();
+ private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
@Override
- public void close() throws HyracksDataException {
- if (postHookFactory != null)
- postHookFactory.createRuntimeHook().configure(ctx);
- function.close();
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
- writer.close();
+ writer.open();
}
- }
-
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, rd0, writers);
}
@Override
@@ -89,17 +88,21 @@
}
@Override
- public void open() throws HyracksDataException {
- rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
- frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
for (IFrameWriter writer : writers) {
- writer.open();
+ writer.close();
}
- if (preHookFactory != null)
- preHookFactory.createRuntimeHook().configure(ctx);
- function.open(ctx, rd0, writers);
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
private final IFrameWriter[] writers;
private TupleDeserializer tupleDe;
private RecordDescriptor inputRd;
+ private ClassLoader ctxCL;
public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
public void functionOpen() throws HyracksDataException {
inputRd = inputRdFactory.createRecordDescriptor();
tupleDe = new TupleDeserializer(inputRd);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
writer.open();
@@ -124,5 +126,6 @@
for (IFrameWriter writer : writers) {
writer.close();
}
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d76 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf;
private VertexWriter vertexWriter;
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
: inputRdFactory.createRecordDescriptor();
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
@Override
@@ -151,6 +154,8 @@
dfs.rename(srcFile, filePath);
} catch (IOException e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..ee16c08 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -83,12 +83,14 @@
final List<FileSplit> splits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private ClassLoader ctxCL;
@Override
public void initialize() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Configuration conf = confFactory.createConfiguration();
writer.open();
for (int i = 0; i < scheduledLocations.length; i++) {
if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
continue;
}
}
- loadVertices(ctx, i);
+ loadVertices(ctx, conf, i);
}
}
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -122,8 +126,9 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+ throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+ IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);