adapt pregelix to the ioc changes

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3379 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 8f1c401..36ceb18 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -91,13 +91,6 @@
     }
 
     @Override
-    public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception {
-        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
-                binaryURLs, null);
-        return (DeploymentId) rpci.call(ipcHandle, dbf);
-    }
-
-    @Override
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
         HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
                 binaryURLs, deploymentId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 93bb88f..2f02780 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -21,6 +21,7 @@
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -30,6 +31,7 @@
 
 import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -120,12 +122,17 @@
     }
 
     @Override
-    public void deployBinary(List<String> jars) throws Exception {
+    public DeploymentId deployBinary(List<String> jars) throws Exception {
+        DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
         List<URL> binaryURLs = new ArrayList<URL>();
         if (jars != null && jars.size() > 0) {
             HttpClient hc = new DefaultHttpClient();
             for (String jar : jars) {
-                String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + jar;
+                int slashIndex = jar.lastIndexOf('/');
+                String fileName = jar.substring(slashIndex + 1);
+                String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/"
+                        + deploymentId.toString() + "&" + fileName;
+                System.out.print("put URL: " + url);
                 HttpPut put = new HttpPut(url);
                 put.setEntity(new FileEntity(new File(jar), "application/octet-stream"));
                 HttpResponse response = hc.execute(put);
@@ -135,6 +142,31 @@
                 binaryURLs.add(new URL(url));
             }
         }
-        hci.deployBinary(binaryURLs);
+        hci.deployBinary(binaryURLs, deploymentId);
+        return deploymentId;
+    }
+
+    @Override
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+        hci.unDeployBinary(deploymentId);
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception {
+        return startJob(deploymentId, jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+                jobSpec);
+        return startJob(deploymentId, jsacggf, jobFlags);
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+            EnumSet<JobFlag> jobFlags) throws Exception {
+        return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 6e4dc73..41b07d7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -19,6 +19,7 @@
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -118,5 +119,53 @@
      * @param jars
      *            a list of user-defined jars
      */
-    public void deployBinary(List<String> jars) throws Exception;
+    public DeploymentId deployBinary(List<String> jars) throws Exception;
+
+    /**
+     * undeploy a certain deployment
+     * 
+     * @param jars
+     *            a list of user-defined jars
+     */
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param jobSpec
+     *            Job Specification
+     * @throws Exception
+     */
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param jobSpec
+     *            Job Specification
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+            EnumSet<JobFlag> jobFlags) throws Exception;
+
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 9c0ba3b..aabc351 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -41,8 +41,6 @@
 
     public ClusterTopology getClusterTopology() throws Exception;
 
-    public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception;
-
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
 
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index fad4300..f1cfee8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -28,4 +28,6 @@
     public ICounterContext getCounterContext();
 
     public Object getGlobalJobData();
+    
+    public Class<?> loadClass(String className);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index d3188d6..1e3f0ad 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -27,6 +27,8 @@
 
     public byte[] serialize(Serializable job) throws HyracksException;
 
+    public Class<?> loadClass(String className) throws HyracksException;
+
     public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
 
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index b7e7920..59571c2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -47,4 +47,13 @@
         throw new UnsupportedOperationException("Not supported by " + this.getClass().getName());
     }
 
+    @Override
+    public Class<?> loadClass(String className) throws HyracksException {
+        try {
+            return this.getClass().getClassLoader().loadClass(className);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
index ed26304..35a1e8b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -31,9 +31,6 @@
             return defaultJobSerDe;
         }
         IJobSerializerDeserializer jobSerDe = jobSerializerDeserializerMap.get(deploymentId);
-        if (jobSerDe == null) {
-            return defaultJobSerDe;
-        }
         return jobSerDe;
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index cb90f4c..a3c8aed 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -68,6 +68,10 @@
         }
     }
 
+    public static Class<?> loadClass(String className) throws IOException, ClassNotFoundException {
+        return Class.forName(className);
+    }
+
     private static class ClassLoaderObjectInputStream extends ObjectInputStream {
         private ClassLoader classLoader;
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
new file mode 100755
index 0000000..40a21c9
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class ApplicationInstallationHandler extends AbstractHandler {
+    private ClusterControllerService ccs;
+
+    public ApplicationInstallationHandler(ClusterControllerService ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+            throws IOException, ServletException {
+        try {
+            while (target.startsWith("/")) {
+                target = target.substring(1);
+            }
+            while (target.endsWith("/")) {
+                target = target.substring(0, target.length() - 1);
+            }
+            String[] parts = target.split("/");
+            if (parts.length != 1) {
+                return;
+            }
+            final String[] params = parts[0].split("&");
+            String deployIdString = params[0];
+            String rootDir = ccs.getServerContext().getBaseDir().toString();
+            final String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + "applications/" + deployIdString
+                    : rootDir + File.separator + "/applications/" + File.separator + deployIdString;
+            if (HttpMethods.PUT.equals(request.getMethod())) {
+                class OutputStreamGetter extends SynchronizableWork {
+                    private OutputStream os;
+
+                    @Override
+                    protected void doRun() throws Exception {
+                        FileUtils.forceMkdir(new File(deploymentDir));
+                        String fileName = params[1];
+                        File jarFile = new File(deploymentDir, fileName);
+                        os = new FileOutputStream(jarFile);
+                        System.out.println("client put: " + jarFile.getAbsolutePath());
+                    }
+                }
+                OutputStreamGetter r = new OutputStreamGetter();
+                try {
+                    ccs.getWorkQueue().scheduleAndSync(r);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+                try {
+                    IOUtils.copyLarge(request.getInputStream(), r.os);
+                } finally {
+                    r.os.close();
+                }
+            } else if (HttpMethods.GET.equals(request.getMethod())) {
+                class InputStreamGetter extends SynchronizableWork {
+                    private InputStream is;
+
+                    @Override
+                    protected void doRun() throws Exception {
+                        String fileName = params[1];
+                        File jarFile = new File(deploymentDir, fileName);
+                        is = new FileInputStream(jarFile);
+                        System.out.println("client request: " + jarFile.getAbsolutePath());
+                    }
+                }
+                InputStreamGetter r = new InputStreamGetter();
+                try {
+                    ccs.getWorkQueue().scheduleAndSync(r);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+                if (r.is == null) {
+                    response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+                } else {
+                    response.setContentType("application/octet-stream");
+                    response.setStatus(HttpServletResponse.SC_OK);
+                    try {
+                        IOUtils.copyLarge(r.is, response.getOutputStream());
+                    } finally {
+                        r.is.close();
+                    }
+                }
+            }
+            baseRequest.setHandled(true);
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index e39e766..c32cb97 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -70,6 +70,10 @@
 
         addHandler(createAdminConsoleHandler());
         addHandler(createStaticResourcesHandler());
+
+        handler = new ContextHandler("/applications");
+        handler.setHandler(new ApplicationInstallationHandler(ccs));
+        addHandler(handler);
     }
 
     private Handler createAdminConsoleHandler() {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
index a4f34af..fa44633 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -27,10 +27,10 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
-public class CliDeployBinaryWork extends AbstractWork {
+public class CliDeployBinaryWork extends SynchronizableWork {
 
     private ClusterControllerService ccs;
     private List<URL> binaryURLs;
@@ -42,10 +42,11 @@
         this.ccs = ncs;
         this.binaryURLs = binaryURLs;
         this.deploymentId = deploymentId;
+        this.callback = callback;
     }
 
     @Override
-    public void run() {
+    public void doRun() {
         try {
             if (deploymentId == null) {
                 deploymentId = new DeploymentId(UUID.randomUUID().toString());
@@ -54,7 +55,7 @@
              * Deploy for the cluster controller
              */
             DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
-                    .getJobSerializerDeserializerContainer(), ccs.getServerContext());
+                    .getJobSerializerDeserializerContainer(), ccs.getServerContext(), false);
 
             /**
              * Deploy for the node controllers
@@ -65,7 +66,7 @@
             for (String nc : nodeControllerStateMap.keySet()) {
                 nodeIds.add(nc);
             }
-            DeploymentRun dRun = new DeploymentRun(nodeIds);
+            final DeploymentRun dRun = new DeploymentRun(nodeIds);
             ccs.addDeploymentRun(deploymentId, dRun);
 
             /***
@@ -75,12 +76,21 @@
                 ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
             }
 
-            /**
-             * wait for completion
-             */
-            dRun.waitForCompletion();
-            ccs.removeDeploymentRun(deploymentId);
-            callback.setValue(deploymentId);
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        /**
+                         * wait for completion
+                         */
+                        dRun.waitForCompletion();
+                        ccs.removeDeploymentRun(deploymentId);
+                        callback.setValue(deploymentId);
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index 584ab7c..d7e5df8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -46,7 +46,8 @@
             /**
              * Deploy for the cluster controller
              */
-            DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer());
+            DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer(),
+                    ccs.getServerContext());
 
             /**
              * Deploy for the node controllers
@@ -66,7 +67,7 @@
             for (NodeControllerState ncs : nodeControllerStateMap.values()) {
                 ncs.getNodeController().undeployBinary(deploymentId);
             }
-            
+
             /**
              * wait for completion
              */
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index bc8c314..b3550bf 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -32,7 +32,7 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         JobRun run = ccs.getActiveRunMap().get(jobId);
-        ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+        //ccs.getDatasetDirectoryService().reportJobFailure(jobId);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, details);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 0e4d366..7a0340a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -10,7 +10,6 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 
 public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
@@ -18,12 +17,12 @@
     private URLClassLoader classLoader;
 
     @Override
-    public JobSpecification deserialize(byte[] jsBytes) throws HyracksException {
+    public Object deserialize(byte[] jsBytes) throws HyracksException {
         try {
             if (classLoader == null) {
-                return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+                return JavaSerializationUtils.deserialize(jsBytes);
             }
-            return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes, classLoader);
+            return JavaSerializationUtils.deserialize(jsBytes, classLoader);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
@@ -53,6 +52,9 @@
             if (classLoader == null) {
                 /** crate a new classloader */
                 URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
+                for (URL url : urls) {
+                    System.out.println("Class loader url " + url);
+                }
                 classLoader = new URLClassLoader(urls, this.getClass().getClassLoader());
             } else {
                 /** add URLs to the existing classloader */
@@ -65,4 +67,13 @@
             throw new HyracksException(e);
         }
     }
+
+    @Override
+    public Class<?> loadClass(String className) throws HyracksException {
+        try {
+            return classLoader.loadClass(className);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index ab84065..e95b85f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -1,11 +1,19 @@
 package edu.uci.ics.hyracks.control.common.deployment;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -17,24 +25,35 @@
 
 public class DeploymentUtils {
 
-    private static final String DEPLOYMENT = "deployment";
+    private static final String DEPLOYMENT = "applications";
 
-    public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container)
-            throws HyracksException {
+    public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container,
+            ServerContext ctx) throws HyracksException {
         container.removeJobSerializerDeserializer(deploymentId);
+        String rootDir = ctx.getBaseDir().toString();
+        String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
+                : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
+        try {
+            File dFile = new File(deploymentDir);
+            if (dFile.exists()) {
+                FileUtils.forceDelete(dFile);
+            }
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
     }
 
     public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
-            ServerContext ctx) throws HyracksException {
+            ServerContext ctx, boolean isNC) throws HyracksException {
         IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
         if (jobSerDe == null) {
             jobSerDe = new ClassLoaderJobSerializerDeserializer();
             container.addJobSerializerDeserializer(deploymentId, jobSerDe);
         }
         String rootDir = ctx.getBaseDir().toString();
-        String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT : rootDir + File.separator
-                + DEPLOYMENT;
-        jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir));
+        String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
+                : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
+        jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
     }
 
     public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
@@ -50,7 +69,21 @@
         }
     }
 
-    private static List<URL> downloadURLs(List<URL> urls, String deploymentDir) throws HyracksException {
+    public static Class<?> loadClass(String className, DeploymentId deploymentId, IApplicationContext appCtx)
+            throws HyracksException {
+        try {
+            IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+            IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+                    .getJobSerializerDeerializer(deploymentId);
+            Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
+                    .loadClass(className);
+            return cl;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
         try {
             List<URL> downloadedFileURLs = new ArrayList<URL>();
             File dir = new File(deploymentDir);
@@ -60,10 +93,23 @@
             for (URL url : urls) {
                 String urlString = url.toString();
                 int slashIndex = urlString.lastIndexOf('/');
-                String fileName = urlString.substring(slashIndex + 1);
+                String fileName = urlString.substring(slashIndex + 1).split("&")[1];
                 String filePath = deploymentDir + File.separator + fileName;
                 File targetFile = new File(filePath);
-                FileUtils.copyURLToFile(url, targetFile);
+                if (isNC) {
+                    HttpClient hc = new DefaultHttpClient();
+                    System.out.println(url.toString());
+                    HttpGet get = new HttpGet(url.toString());
+                    HttpResponse response = hc.execute(get);
+                    InputStream is = response.getEntity().getContent();
+                    OutputStream os = new FileOutputStream(targetFile);
+                    try {
+                        IOUtils.copyLarge(is, os);
+                    } finally {
+                        os.close();
+                        is.close();
+                    }
+                }
                 downloadedFileURLs.add(targetFile.toURI().toURL());
             }
             return downloadedFileURLs;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 3855b4d..8ac17d9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -43,6 +44,7 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
@@ -58,6 +60,8 @@
 
     private final INCApplicationContext appCtx;
 
+    private final DeploymentId deploymentId;
+
     private final JobId jobId;
 
     private final ActivityClusterGraph acg;
@@ -86,10 +90,11 @@
 
     private boolean cleanupPending;
 
-    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
-            ActivityClusterGraph acg) {
+    public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
+            INCApplicationContext appCtx, ActivityClusterGraph acg) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
+        this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.frameSize = acg.getFrameSize();
         this.acg = acg;
@@ -283,4 +288,13 @@
             e.printStackTrace();
         }
     }
+
+    @Override
+    public Class<?> loadClass(String className) {
+        try {
+            return DeploymentUtils.loadClass(className, deploymentId, appCtx);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
index eecf67f..4f613a1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -26,7 +26,7 @@
     public void run() {
         try {
             DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
-                    .getJobSerializerDeserializerContainer(), ncs.getServerContext());
+                    .getJobSerializerDeserializerContainer(), ncs.getServerContext(), true);
             IClusterController ccs = ncs.getClusterController();
             ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 939276a..2c6d38e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -89,7 +89,7 @@
     public void run() {
         try {
             NCApplicationContext appCtx = ncs.getApplicationContext();
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+            final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes == null ? null
                     : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
 
@@ -168,15 +168,15 @@
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
-            throws Exception {
+    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
+            ActivityClusterGraph acg) throws Exception {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
             if (acg == null) {
                 throw new NullPointerException("JobActivityGraph was null");
             }
-            ji = new Joblet(ncs, jobId, appCtx, acg);
+            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }
         return ji;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
index 38a764b..bd77fb5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -20,7 +20,8 @@
     @Override
     public void run() {
         try {
-            DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer());
+            DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer(),
+                    ncs.getServerContext());
             IClusterController ccs = ncs.getClusterController();
             ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 7612db9..e4d94ff 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -92,4 +92,13 @@
     public Object getGlobalJobData() {
         return null;
     }
+
+    @Override
+    public Class<?> loadClass(String className) {
+        try {
+            return Class.forName(className);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 72256f9..d1a665c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -22,9 +22,6 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +30,7 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -45,7 +43,6 @@
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.util.Utilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
@@ -54,9 +51,7 @@
     private JobGen jobGen;
     private boolean profiling;
 
-    private String applicationName;
     private IHyracksClientConnection hcc;
-
     private Class exampleClass;
 
     public Driver(Class exampleClass) {
@@ -71,7 +66,6 @@
     @Override
     public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
             throws HyracksException {
-        applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
         try {
             /** add hadoop configurations */
             URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
@@ -121,13 +115,13 @@
             for (URL url : urls)
                 if (url.toString().endsWith(".jar"))
                     jars.add(new File(url.getPath()));
-            installApplication(jars);
+            DeploymentId deploymentId = installApplication(jars);
 
             start = System.currentTimeMillis();
             FileSystem dfs = FileSystem.get(job.getConfiguration());
             dfs.delete(FileOutputFormat.getOutputPath(job), true);
-            runCreate(jobGen);
-            runDataLoad(jobGen);
+            runCreate(deploymentId, jobGen);
+            runDataLoad(deploymentId, jobGen);
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("data loading finished " + time + "ms");
@@ -135,7 +129,7 @@
             boolean terminate = false;
             do {
                 start = System.currentTimeMillis();
-                runLoopBodyIteration(jobGen, i);
+                runLoopBodyIteration(deploymentId, jobGen, i);
                 end = System.currentTimeMillis();
                 time = end - start;
                 LOG.info("iteration " + i + " finished " + time + "ms");
@@ -145,8 +139,8 @@
             } while (!terminate);
 
             start = System.currentTimeMillis();
-            runHDFSWRite(jobGen);
-            runCleanup(jobGen);
+            runHDFSWRite(deploymentId, jobGen);
+            runCleanup(deploymentId, jobGen);
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("result writing finished " + time + "ms");
@@ -156,79 +150,74 @@
         }
     }
 
-    private void runCreate(JobGen jobGen) throws Exception {
+    private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
-            execute(treeCreateSpec);
+            execute(deploymentId, treeCreateSpec);
         } catch (Exception e) {
             throw e;
         }
     }
 
-    private void runDataLoad(JobGen jobGen) throws Exception {
+    private void runDataLoad(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
-            execute(bulkLoadJobSpec);
+            execute(deploymentId, bulkLoadJobSpec);
         } catch (Exception e) {
             throw e;
         }
     }
 
-    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+    private void runLoopBodyIteration(DeploymentId deploymentId, JobGen jobGen, int iteration) throws Exception {
         try {
             JobSpecification loopBody = jobGen.generateJob(iteration);
-            execute(loopBody);
+            execute(deploymentId, loopBody);
         } catch (Exception e) {
             throw e;
         }
     }
 
-    private void runHDFSWRite(JobGen jobGen) throws Exception {
+    private void runHDFSWRite(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
-            execute(scanSortPrintJobSpec);
+            execute(deploymentId, scanSortPrintJobSpec);
         } catch (Exception e) {
             throw e;
         }
     }
 
-    private void runCleanup(JobGen jobGen) throws Exception {
+    private void runCleanup(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification[] cleanups = jobGen.generateCleanup();
-            runJobArray(cleanups);
+            runJobArray(deploymentId, cleanups);
         } catch (Exception e) {
             throw e;
         }
     }
 
-    private void runJobArray(JobSpecification[] jobs) throws Exception {
+    private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
         for (JobSpecification job : jobs) {
-            execute(job);
+            execute(deploymentId, job);
         }
     }
 
-    private void execute(JobSpecification job) throws Exception {
+    private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc
-                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        JobId jobId = hcc.startJob(deploymentId, job,
+                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
     }
 
-    public void installApplication(List<File> jars) throws Exception {
-        Set<String> allJars = new TreeSet<String>();
+    public DeploymentId installApplication(List<File> jars) throws Exception {
+        List<String> allJars = new ArrayList<String>();
         for (File jar : jars) {
             allJars.add(jar.getAbsolutePath());
         }
         long start = System.currentTimeMillis();
-        File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+        DeploymentId deploymentId = hcc.deployBinary(allJars);
         long end = System.currentTimeMillis();
-        LOG.info("jar packing finished " + (end - start) + "ms");
-
-        start = System.currentTimeMillis();
-        // TODO: Fix this step to use Yarn
-        //hcc.createApplication(applicationName, appZip);
-        end = System.currentTimeMillis();
         LOG.info("jar deployment finished " + (end - start) + "ms");
+        return deploymentId;
     }
 }
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index d1d927d..145169e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.pregelix.core.runtime.touchpoint;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -29,9 +30,9 @@
     }
 
     @Override
-    public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+    public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
         try {
-            return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+            return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index bcf3ffc..25ad28a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -17,6 +17,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -40,9 +41,10 @@
             throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         try {
+            ClassLoader loader = DataflowUtils.class.getClassLoader();
             recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                    (Class<? extends Writable>) Class.forName(className1),
-                    (Class<? extends Writable>) Class.forName(className2));
+                    (Class<? extends Writable>) loader.loadClass(className1),
+                    (Class<? extends Writable>) loader.loadClass(className2));
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -53,11 +55,12 @@
     public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
         RecordDescriptor recordDescriptor = null;
         ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+        ClassLoader loader = DataflowUtils.class.getClassLoader();
         try {
             int i = 0;
             for (String className : classNames)
-                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
-                        .forName(className));
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
+                        .loadClass(className));
         } catch (ClassNotFoundException cnfe) {
             throw new HyracksException(cnfe);
         }
@@ -79,4 +82,35 @@
                 new IAggregateFunctionFactory[] { aggFuncFactory });
         return aggregatorFactory;
     }
+
+    @SuppressWarnings("unchecked")
+    public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
+            String className2) throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        try {
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
+                    .getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
+                    .loadClass(className2));
+        } catch (Exception cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        return recordDescriptor;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
+            throws HyracksException {
+        RecordDescriptor recordDescriptor = null;
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+        try {
+            int i = 0;
+            for (String className : classNames)
+                serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
+                        .getJobletContext().loadClass(className));
+        } catch (Exception cnfe) {
+            throw new HyracksException(cnfe);
+        }
+        recordDescriptor = new RecordDescriptor(serdes);
+        return recordDescriptor;
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
new file mode 100755
index 0000000..461d76a
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
@@ -0,0 +1,117 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+#  Copyright 2001-2006 The Apache Software Foundation.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ----------------------------------------------------------------------------
+#
+#   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights
+#   reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  Darwin*) darwin=true
+           if [ -z "$JAVA_VERSION" ] ; then
+             JAVA_VERSION="CurrentJDK"
+           else
+             echo "Using Java version: $JAVA_VERSION"
+           fi
+           if [ -z "$JAVA_HOME" ] ; then
+             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+           fi
+           ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD=`which java`
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." 1>&2
+  echo "  We cannot execute $JAVACMD" 1>&2
+  exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+  REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS  \
+  -classpath "$CLASSPATH" \
+  -Dapp.name="pregelixcc" \
+  -Dapp.pid="$$" \
+  -Dapp.repo="$REPO" \
+  -Dapp.home="$BASEDIR" \
+  -Dbasedir="$BASEDIR" \
+  edu.uci.ics.hyracks.control.cc.CCDriver \
+  "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
new file mode 100755
index 0000000..4a6ca1f
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
@@ -0,0 +1,118 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+#  Copyright 2001-2006 The Apache Software Foundation.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ----------------------------------------------------------------------------
+#
+#   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights
+#   reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  Darwin*) darwin=true
+           if [ -z "$JAVA_VERSION" ] ; then
+             JAVA_VERSION="CurrentJDK"
+           else
+             echo "Using Java version: $JAVA_VERSION"
+           fi
+           if [ -z "$JAVA_HOME" ] ; then
+             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+           fi
+           ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD=`which java`
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." 1>&2
+  echo "  We cannot execute $JAVACMD" 1>&2
+  exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+  REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`2
+  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS  \
+  -classpath "$CLASSPATH" \
+  -Dapp.name="pregelixnc" \
+  -Dapp.pid="$$" \
+  -Dapp.repo="$REPO" \
+  -Dapp.home="$BASEDIR" \
+  -Dbasedir="$BASEDIR" \
+  edu.uci.ics.hyracks.control.nc.NCDriver \
+  -app-nc-main-class edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index 133b604..74f0987 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -20,12 +20,15 @@
 export JAVA_HOME=$JAVA_HOME
 export JAVA_OPTS=$CCJAVA_OPTS
 
+PREGELIX_HOME=`pwd`
 
-chmod -R 755 $HYRACKS_HOME
+#Enter the temp dir
+cd $CCTMP_DIR
+
 if [ -f "conf/topology.xml"  ]; then
 #Launch hyracks cc script with topology
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
 else
 #Launch hyracks cc script without toplogy
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
 fi
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index b059aad..3ab1b25 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -39,11 +39,10 @@
 #Set JAVA_OPTS
 export JAVA_OPTS=$NCJAVA_OPTS
 
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+PREGELIX_HOME=`pwd`
 
 #Enter the temp dir
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
index c2f525a..84f369f 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
@@ -2,7 +2,18 @@
 . conf/cluster.properties
 
 #Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+    PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
+    USERID=`id | sed 's/^uid=//;s/(.*$//'`
+    PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+fi
+
 echo $PID
 kill -9 $PID
 
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 35c4794..31de984 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -2,7 +2,7 @@
 . conf/cluster.properties
 
 #Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
   PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
@@ -10,7 +10,7 @@
 
 if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
-  PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+  PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
 fi
 
 echo $PID
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
index e7de650..7454f2d 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
@@ -16,11 +16,12 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordDescriptorFactory extends Serializable {
 
-    public RecordDescriptor createRecordDescriptor() throws HyracksDataException;
+    public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index 4cbd6c4..24d28b0 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -66,7 +66,7 @@
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
+                        : inputRdFactory.createRecordDescriptor(ctx);
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
                 ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 99bca1a..f75dab2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -56,7 +56,7 @@
      * @throws HyracksDataException
      */
     public void functionOpen() throws HyracksDataException {
-        inputRd = inputRdFactory.createRecordDescriptor();
+        inputRd = inputRdFactory.createRecordDescriptor(ctx);
         tupleDe = new TupleDeserializer(inputRd);
         ctxCL = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index eda7754..62aeeea 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -60,7 +60,7 @@
             @SuppressWarnings("rawtypes")
             private GlobalAggregator aggregator = BspUtils.createGlobalAggregator(conf);
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    inputRdFactory.createRecordDescriptor());
+                    inputRdFactory.createRecordDescriptor(ctx));
             private ByteBufferInputStream inputStream = new ByteBufferInputStream();
             private DataInput input = new DataInputStream(inputStream);
             private Writable partialAggregateValue = BspUtils.createFinalAggregateValue(conf);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 2402748..8bc91c2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -75,7 +75,7 @@
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
+                        : inputRdFactory.createRecordDescriptor(ctx);
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
                 ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
index d7cbb3a..5da0239 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -64,7 +64,7 @@
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
-                        : inputRdFactory.createRecordDescriptor();
+                        : inputRdFactory.createRecordDescriptor(ctx);
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
                 try {
                     outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]