change pregelix refection code to use the proper classloader
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3381 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index f1cfee8..f8c5b6a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -28,6 +28,8 @@
public ICounterContext getCounterContext();
public Object getGlobalJobData();
-
+
public Class<?> loadClass(String className);
+
+ public ClassLoader getClassLoader();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index 1e3f0ad..d402cd3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -31,4 +31,6 @@
public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
+ public ClassLoader getClassLoader() throws HyracksException;
+
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index 59571c2..0ea4309 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -56,4 +56,9 @@
}
}
+ @Override
+ public ClassLoader getClassLoader() throws HyracksException {
+ return this.getClass().getClassLoader();
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 789e48f..9b8a996 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -392,8 +392,8 @@
case CLI_UNDEPLOY_BINARY: {
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
- workQueue
- .schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId()));
+ workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
+ new IPCResponder<DeploymentId>(handle, mid)));
return;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index d7e5df8..8e429c9 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -25,20 +25,24 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-public class CliUnDeployBinaryWork extends AbstractWork {
+public class CliUnDeployBinaryWork extends SynchronizableWork {
private ClusterControllerService ccs;
private DeploymentId deploymentId;
+ private IPCResponder<DeploymentId> callback;
- public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId) {
+ public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId,
+ IPCResponder<DeploymentId> callback) {
this.ccs = ncs;
this.deploymentId = deploymentId;
+ this.callback = callback;
}
@Override
- public void run() {
+ public void doRun() {
try {
if (deploymentId == null) {
deploymentId = new DeploymentId(UUID.randomUUID().toString());
@@ -58,7 +62,7 @@
for (String nc : nodeControllerStateMap.keySet()) {
nodeIds.add(nc);
}
- DeploymentRun dRun = new DeploymentRun(nodeIds);
+ final DeploymentRun dRun = new DeploymentRun(nodeIds);
ccs.addDeploymentRun(deploymentId, dRun);
/***
@@ -68,10 +72,21 @@
ncs.getNodeController().undeployBinary(deploymentId);
}
- /**
- * wait for completion
- */
- dRun.waitForCompletion();
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ /**
+ * wait for completion
+ */
+ dRun.waitForCompletion();
+ ccs.removeDeploymentRun(deploymentId);
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 7a0340a..25420a2 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -76,4 +76,9 @@
throw new HyracksException(e);
}
}
+
+ @Override
+ public ClassLoader getClassLoader() throws HyracksException {
+ return classLoader;
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index e95b85f..6ec8a14 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -83,6 +83,19 @@
}
}
+ public static ClassLoader getClassLoader(DeploymentId deploymentId, IApplicationContext appCtx)
+ throws HyracksException {
+ try {
+ IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+ .getJobSerializerDeerializer(deploymentId);
+ ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
+ return cl;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
try {
List<URL> downloadedFileURLs = new ArrayList<URL>();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8ac17d9..c6a03dc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -297,4 +297,13 @@
throw new RuntimeException(e);
}
}
+
+ @Override
+ public ClassLoader getClassLoader() {
+ try {
+ return DeploymentUtils.getClassLoader(deploymentId, appCtx);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 9e9abdf..8b58ecd 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -114,6 +114,7 @@
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Job job = confFactory.getConf();
+ job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
writer.open();
InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index e4d94ff..8285efe 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -101,4 +101,9 @@
throw new RuntimeException(e);
}
}
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return this.getClass().getClassLoader();
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d1a665c..1f071bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -144,6 +144,7 @@
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
+ hcc.unDeployBinary(deploymentId);
LOG.info("job finished");
} catch (Exception e) {
throw new HyracksException(e);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index f3089ba..d225eb4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -17,6 +17,7 @@
import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -34,11 +35,11 @@
}
@Override
- public Configuration createConfiguration() throws HyracksDataException {
+ public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException {
try {
Configuration conf = new Configuration();
+ conf.setClassLoader(ctx.getJobletContext().getClassLoader());
SerDeUtils.deserialize(conf, data);
- conf.setClassLoader(this.getClass().getClassLoader());
return conf;
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 4be0bed..dc75b07 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,9 +16,11 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public interface IAggregateFunctionFactory extends Serializable {
- public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException;
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
+ IDataOutputProvider provider) throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 62aeeea..71592ee 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -56,7 +56,7 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf = confFactory.createConfiguration(ctx);
@SuppressWarnings("rawtypes")
private GlobalAggregator aggregator = BspUtils.createGlobalAggregator(conf);
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 8bc91c2..1d9c778 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -79,10 +79,11 @@
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- conf = confFactory.createConfiguration();
+ conf = confFactory.createConfiguration(ctx);
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
context = ctxFactory.createContext(conf, partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
index 88a0dda..f54d176 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
@@ -45,10 +45,10 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf = confFactory.createConfiguration(ctx);
private boolean terminate = true;
@Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index 0da7baf..343ba7e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -81,15 +81,12 @@
final List<FileSplit> splits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private ClassLoader ctxCL;
private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
- ctxCL = Thread.currentThread().getContextClassLoader();
try {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
writer.open();
for (int i = 0; i < scheduledLocations.length; i++) {
if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -109,8 +106,6 @@
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -135,10 +130,11 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
InputSplit split = splits.get(splitId);
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
+ mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
vertexReader.initialize(split, mapperContext);
- Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
+ Vertex readerVertex = (Vertex) BspUtils.createVertex(mapperContext.getConfiguration());
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index b31f376..2e58196 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -18,10 +18,11 @@
import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IConfigurationFactory extends Serializable {
- public Configuration createConfiguration() throws HyracksDataException;
+ public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index a8a752e..8d0b776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -39,7 +39,7 @@
@Override
public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+ return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context), context);
}
}
@@ -52,7 +52,7 @@
private List<VLongWritable> pool = new ArrayList<VLongWritable>();
private int used = 0;
- public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
super(lineRecordReader);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 0c09757..0a0a14f 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -109,7 +109,7 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ this.conf = confFactory.createConfiguration(ctx);
this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 1bf6a2b..9998205 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -112,7 +112,7 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ this.conf = confFactory.createConfiguration(ctx);
this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 8f63b6e..851a83a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -40,7 +40,7 @@
@SuppressWarnings("unchecked")
@Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
return new IAggregatorDescriptor() {
@@ -113,7 +113,7 @@
for (int i = 0; i < agg.length; i++) {
aggOutput[i] = new ArrayBackedValueStorage();
try {
- agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+ agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
} catch (Exception e) {
throw new IllegalStateException(e);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 1813dcc..3cf46a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
@@ -53,9 +54,9 @@
private MsgList msgList = new MsgList();
private boolean keyRead = false;
- public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage,
- boolean partialAggAsInput) throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
+ boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+ this.conf = confFactory.createConfiguration(ctx);
this.output = output;
this.isFinalStage = isFinalStage;
this.partialAggAsInput = partialAggAsInput;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index a09f688..7ce9e1d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -36,8 +37,9 @@
}
@Override
- public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
+ throws HyracksException {
DataOutput output = provider.getDataOutput();
- return new AggregationFunction(confFactory, output, isFinalStage, partialAggAsInput);
+ return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
}
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 5f0ed9e..850ae1e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -39,7 +39,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
IterationUtils.setProperties(giraphJobId, ctx, conf);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index c025f85..05b1542 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -44,9 +44,10 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
try {
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
+ mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {