change pregelix refection code to use the proper classloader

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3381 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index f1cfee8..f8c5b6a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -28,6 +28,8 @@
     public ICounterContext getCounterContext();
 
     public Object getGlobalJobData();
-    
+
     public Class<?> loadClass(String className);
+
+    public ClassLoader getClassLoader();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index 1e3f0ad..d402cd3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -31,4 +31,6 @@
 
     public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
 
+    public ClassLoader getClassLoader() throws HyracksException;
+
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index 59571c2..0ea4309 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -56,4 +56,9 @@
         }
     }
 
+    @Override
+    public ClassLoader getClassLoader() throws HyracksException {
+        return this.getClass().getClassLoader();
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 789e48f..9b8a996 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -392,8 +392,8 @@
 
                 case CLI_UNDEPLOY_BINARY: {
                     HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
-                    workQueue
-                            .schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId()));
+                    workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
+                            new IPCResponder<DeploymentId>(handle, mid)));
                     return;
                 }
             }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index d7e5df8..8e429c9 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -25,20 +25,24 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
-public class CliUnDeployBinaryWork extends AbstractWork {
+public class CliUnDeployBinaryWork extends SynchronizableWork {
 
     private ClusterControllerService ccs;
     private DeploymentId deploymentId;
+    private IPCResponder<DeploymentId> callback;
 
-    public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId) {
+    public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId,
+            IPCResponder<DeploymentId> callback) {
         this.ccs = ncs;
         this.deploymentId = deploymentId;
+        this.callback = callback;
     }
 
     @Override
-    public void run() {
+    public void doRun() {
         try {
             if (deploymentId == null) {
                 deploymentId = new DeploymentId(UUID.randomUUID().toString());
@@ -58,7 +62,7 @@
             for (String nc : nodeControllerStateMap.keySet()) {
                 nodeIds.add(nc);
             }
-            DeploymentRun dRun = new DeploymentRun(nodeIds);
+            final DeploymentRun dRun = new DeploymentRun(nodeIds);
             ccs.addDeploymentRun(deploymentId, dRun);
 
             /***
@@ -68,10 +72,21 @@
                 ncs.getNodeController().undeployBinary(deploymentId);
             }
 
-            /**
-             * wait for completion
-             */
-            dRun.waitForCompletion();
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        /**
+                         * wait for completion
+                         */
+                        dRun.waitForCompletion();
+                        ccs.removeDeploymentRun(deploymentId);
+                        callback.setValue(null);
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 7a0340a..25420a2 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -76,4 +76,9 @@
             throw new HyracksException(e);
         }
     }
+
+    @Override
+    public ClassLoader getClassLoader() throws HyracksException {
+        return classLoader;
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index e95b85f..6ec8a14 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -83,6 +83,19 @@
         }
     }
 
+    public static ClassLoader getClassLoader(DeploymentId deploymentId, IApplicationContext appCtx)
+            throws HyracksException {
+        try {
+            IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+            IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+                    .getJobSerializerDeerializer(deploymentId);
+            ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
+            return cl;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
     private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
         try {
             List<URL> downloadedFileURLs = new ArrayList<URL>();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8ac17d9..c6a03dc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -297,4 +297,13 @@
             throw new RuntimeException(e);
         }
     }
+
+    @Override
+    public ClassLoader getClassLoader() {
+        try {
+            return DeploymentUtils.getClassLoader(deploymentId, appCtx);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 9e9abdf..8b58ecd 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -114,6 +114,7 @@
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     Job job = confFactory.getConf();
+                    job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
                     InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index e4d94ff..8285efe 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -101,4 +101,9 @@
             throw new RuntimeException(e);
         }
     }
+
+    @Override
+    public ClassLoader getClassLoader() {
+        return this.getClass().getClassLoader();
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d1a665c..1f071bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -144,6 +144,7 @@
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("result writing finished " + time + "ms");
+            hcc.unDeployBinary(deploymentId);
             LOG.info("job finished");
         } catch (Exception e) {
             throw new HyracksException(e);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index f3089ba..d225eb4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -17,6 +17,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -34,11 +35,11 @@
     }
 
     @Override
-    public Configuration createConfiguration() throws HyracksDataException {
+    public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException {
         try {
             Configuration conf = new Configuration();
+            conf.setClassLoader(ctx.getJobletContext().getClassLoader());
             SerDeUtils.deserialize(conf, data);
-            conf.setClassLoader(this.getClass().getClassLoader());
             return conf;
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 4be0bed..dc75b07 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,9 +16,11 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public interface IAggregateFunctionFactory extends Serializable {
-    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException;
+	public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
+			IDataOutputProvider provider) throws HyracksException;
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 62aeeea..71592ee 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -56,7 +56,7 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new AbstractUnaryInputSinkOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf = confFactory.createConfiguration(ctx);
             @SuppressWarnings("rawtypes")
             private GlobalAggregator aggregator = BspUtils.createGlobalAggregator(conf);
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 8bc91c2..1d9c778 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -79,10 +79,11 @@
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
                 ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                conf = confFactory.createConfiguration();
+                conf = confFactory.createConfiguration(ctx);
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
                 context = ctxFactory.createContext(conf, partition);
+                context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                 try {
                     vertexWriter = outputFormat.createVertexWriter(context);
                 } catch (InterruptedException e) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
index 88a0dda..f54d176 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
@@ -45,10 +45,10 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new AbstractUnaryInputSinkOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf = confFactory.createConfiguration(ctx);
             private boolean terminate = true;
 
             @Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index 0da7baf..343ba7e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -81,15 +81,12 @@
         final List<FileSplit> splits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private ClassLoader ctxCL;
             private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void initialize() throws HyracksDataException {
-                ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
-                    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                    Configuration conf = confFactory.createConfiguration();
+                    Configuration conf = confFactory.createConfiguration(ctx);
                     writer.open();
                     for (int i = 0; i < scheduledLocations.length; i++) {
                         if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -109,8 +106,6 @@
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
-                } finally {
-                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
@@ -135,10 +130,11 @@
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
                 InputSplit split = splits.get(splitId);
                 TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
+                mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
 
                 VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
                 vertexReader.initialize(split, mapperContext);
-                Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
+                Vertex readerVertex = (Vertex) BspUtils.createVertex(mapperContext.getConfiguration());
                 ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
                 DataOutput dos = tb.getDataOutput();
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index b31f376..2e58196 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -18,10 +18,11 @@
 
 import org.apache.hadoop.conf.Configuration;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IConfigurationFactory extends Serializable {
 
-    public Configuration createConfiguration() throws HyracksDataException;
+    public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index a8a752e..8d0b776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -39,7 +39,7 @@
     @Override
     public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
             InputSplit split, TaskAttemptContext context) throws IOException {
-        return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+        return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context), context);
     }
 }
 
@@ -52,7 +52,7 @@
     private List<VLongWritable> pool = new ArrayList<VLongWritable>();
     private int used = 0;
 
-    public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+    public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
         super(lineRecordReader);
     }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 0c09757..0a0a14f 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -109,7 +109,7 @@
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.conf = confFactory.createConfiguration();
+                this.conf = confFactory.createConfiguration(ctx);
                 this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 1bf6a2b..9998205 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -112,7 +112,7 @@
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.conf = confFactory.createConfiguration();
+                this.conf = confFactory.createConfiguration(ctx);
                 this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 8f63b6e..851a83a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -40,7 +40,7 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
 
         return new IAggregatorDescriptor() {
@@ -113,7 +113,7 @@
                 for (int i = 0; i < agg.length; i++) {
                     aggOutput[i] = new ArrayBackedValueStorage();
                     try {
-                        agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+                        agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
                     } catch (Exception e) {
                         throw new IllegalStateException(e);
                     }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 1813dcc..3cf46a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
@@ -53,9 +54,9 @@
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
 
-    public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage,
-            boolean partialAggAsInput) throws HyracksDataException {
-        this.conf = confFactory.createConfiguration();
+    public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
+            boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+        this.conf = confFactory.createConfiguration(ctx);
         this.output = output;
         this.isFinalStage = isFinalStage;
         this.partialAggAsInput = partialAggAsInput;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index a09f688..7ce9e1d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -36,8 +37,9 @@
     }
 
     @Override
-    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+    public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
+            throws HyracksException {
         DataOutput output = provider.getDataOutput();
-        return new AggregationFunction(confFactory, output, isFinalStage, partialAggAsInput);
+        return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
     }
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 5f0ed9e..850ae1e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -39,7 +39,7 @@
 
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
-                Configuration conf = confFactory.createConfiguration();
+                Configuration conf = confFactory.createConfiguration(ctx);
                 IterationUtils.setProperties(giraphJobId, ctx, conf);
             }
 
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index c025f85..05b1542 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -44,9 +44,10 @@
 
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
-                Configuration conf = confFactory.createConfiguration();
+                Configuration conf = confFactory.createConfiguration(ctx);
                 try {
                     TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
+                    mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                     Vertex.setContext(mapperContext);
                     BspUtils.setDefaultConfiguration(conf);
                 } catch (Exception e) {