Refactored Node Controller to use work queue

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@659 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
index cd6579d..e94086f 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
@@ -45,7 +45,7 @@
             try {
                 wait();
             } catch (InterruptedException e) {
-                e.printStackTrace();
+                throw e;
             }
         }
         if (e != null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 2176dda..36c31cf 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -154,12 +154,14 @@
         taskMap.remove(task);
         TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
         task.dumpProfile(taskProfile);
-        nodeController.notifyTaskComplete(jobId, task.getTaskAttemptId(), taskProfile);
+        nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
+                nodeController.getId(), taskProfile);
     }
 
-    public synchronized void notifyTaskFailed(Task task, Exception exception) {
+    public synchronized void notifyTaskFailed(Task task, Exception exception) throws Exception {
         taskMap.remove(task);
-        nodeController.notifyTaskFailed(jobId, task.getTaskAttemptId(), exception);
+        nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
+                exception);
     }
 
     public NodeControllerService getNodeController() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 28cb8be..50c2c32 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -15,9 +15,6 @@
 package edu.uci.ics.hyracks.control.nc;
 
 import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
@@ -25,12 +22,10 @@
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -44,39 +39,15 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
-import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.naming.MultipartName;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -87,16 +58,18 @@
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
-import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
-import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
-import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
-import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
+import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobWork;
+import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -113,6 +86,8 @@
 
     private final ConnectionManager connectionManager;
 
+    private final WorkQueue queue;
+
     private final Timer timer;
 
     private IClusterController ccs;
@@ -147,6 +122,7 @@
         partitionManager = new PartitionManager(this);
         connectionManager.setPartitionRequestListener(partitionManager);
 
+        queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
@@ -206,6 +182,18 @@
         return id;
     }
 
+    public ServerContext getServerContext() {
+        return serverCtx;
+    }
+
+    public Map<String, NCApplicationContext> getApplications() {
+        return applications;
+    }
+
+    public Map<JobId, Joblet> getJobletMap() {
+        return jobletMap;
+    }
+
     public ConnectionManager getConnectionManager() {
         return connectionManager;
     }
@@ -218,6 +206,63 @@
         return ccs;
     }
 
+    public NodeParameters getNodeParameters() {
+        return nodeParameters;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    @Override
+    public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
+            List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
+        StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
+        queue.scheduleAndSync(stw);
+    }
+
+    @Override
+    public void cleanUpJob(JobId jobId) throws Exception {
+        CleanupJobWork cjw = new CleanupJobWork(this, jobId);
+        queue.scheduleAndSync(cjw);
+    }
+
+    @Override
+    public void notifyRegistration(IClusterController ccs) throws Exception {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public NCConfig getConfiguration() throws Exception {
+        return ncConfig;
+    }
+
+    @Override
+    public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+        AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
+        queue.scheduleAndSync(atw);
+    }
+
+    @Override
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception {
+        CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState);
+        queue.scheduleAndSync(caw);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        DestroyApplicationWork daw = new DestroyApplicationWork(this, appName);
+        queue.scheduleAndSync(daw);
+    }
+
+    @Override
+    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+        ReportPartitionAvailabilityWork rpaw = new ReportPartitionAvailabilityWork(this, pid, networkAddress);
+        queue.scheduleAndSync(rpaw);
+    }
+
     private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
         String ipaddrStr = ncConfig.dataIPAddress;
         ipaddrStr = ipaddrStr.trim();
@@ -235,184 +280,6 @@
         return InetAddress.getByAddress(ipBytes);
     }
 
-    @Override
-    public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
-            List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
-        try {
-            NCApplicationContext appCtx = applications.get(appName);
-            final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
-            Map<MultipartName, Object> ctxVarMap = (Map<MultipartName, Object>) appCtx.deserialize(ctxVarBytes);
-
-            IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
-                @Override
-                public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
-                    return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
-                }
-
-                @Override
-                public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
-                    return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
-                }
-            };
-
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
-
-            for (TaskAttemptDescriptor td : taskDescriptors) {
-                TaskAttemptId taId = td.getTaskAttemptId();
-                TaskId tid = taId.getTaskId();
-                IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Initializing " + taId + " -> " + han);
-                }
-                final int partition = tid.getPartition();
-                Map<MultipartName, Object> inputGlobalVariables = createInputGlobalVariables(ctxVarMap, han);
-                Task task = new Task(joblet, taId, han.getClass().getName(), executor);
-                IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(),
-                        tid.getPartition());
-                IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition,
-                        td.getPartitionCount());
-
-                List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
-
-                List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
-                if (inputs != null) {
-                    for (int i = 0; i < inputs.size(); ++i) {
-                        IConnectorDescriptor conn = inputs.get(i);
-                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("input: " + i + ": " + conn.getConnectorId());
-                        }
-                        RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
-                                recordDesc, cPolicy);
-                        collectors.add(collector);
-                    }
-                }
-                List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
-                if (outputs != null) {
-                    for (int i = 0; i < outputs.size(); ++i) {
-                        final IConnectorDescriptor conn = outputs.get(i);
-                        RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
-
-                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
-                                partition, taId);
-
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("output: " + i + ": " + conn.getConnectorId());
-                        }
-                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
-                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
-                        operator.setOutputFrameWriter(i, writer, recordDesc);
-                    }
-                }
-
-                task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
-                joblet.addTask(task);
-
-                task.start();
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    private Map<MultipartName, Object> createInputGlobalVariables(Map<MultipartName, Object> ctxVarMap, IActivity han) {
-        Map<MultipartName, Object> gVars = new HashMap<MultipartName, Object>();
-        //        for (MultipartName inVar : han.getInputVariables()) {
-        //            gVars.put(inVar, ctxVarMap.get(inVar));
-        //        }
-        return gVars;
-    }
-
-    private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
-            int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
-            throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
-                td.getInputPartitionCounts()[i], td.getPartitionCount());
-        if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, task.getTaskAttemptId(),
-                    executor);
-        } else {
-            return collector;
-        }
-    }
-
-    private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
-            final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
-        if (cPolicy.materializeOnSendSide()) {
-            return new IPartitionWriterFactory() {
-                @Override
-                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
-                            conn.getConnectorId(), senderIndex, receiverIndex), taId, executor);
-                }
-            };
-        } else {
-            return new IPartitionWriterFactory() {
-                @Override
-                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
-                            senderIndex, receiverIndex), taId);
-                }
-            };
-        }
-    }
-
-    private synchronized Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null) {
-            ji = new Joblet(this, jobId, appCtx);
-            jobletMap.put(jobId, ji);
-        }
-        return ji;
-    }
-
-    public Executor getExecutor() {
-        return executor;
-    }
-
-    @Override
-    public void cleanUpJob(JobId jobId) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Cleaning up after job: " + jobId);
-        }
-        Joblet joblet = jobletMap.remove(jobId);
-        if (joblet != null) {
-            partitionManager.unregisterPartitions(jobId);
-            joblet.close();
-        }
-    }
-
-    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, TaskProfile taskProfile) throws Exception {
-        try {
-            ccs.notifyTaskComplete(jobId, taskId, id, taskProfile);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    public void notifyTaskFailed(JobId jobId, TaskAttemptId taskId, Exception exception) {
-        try {
-            ccs.notifyTaskFailure(jobId, taskId, id, exception);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void notifyRegistration(IClusterController ccs) throws Exception {
-        this.ccs = ccs;
-    }
-
-    @Override
-    public NCConfig getConfiguration() throws Exception {
-        return ncConfig;
-    }
-
     private class HeartbeatTask extends TimerTask {
         private IClusterController cc;
 
@@ -483,70 +350,4 @@
             }
         }
     }
-
-    @Override
-    public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
-        }
-        Joblet ji = jobletMap.get(jobId);
-        if (ji != null) {
-            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
-            for (TaskAttemptId taId : tasks) {
-                Task task = taskMap.get(taId);
-                if (task != null) {
-                    task.abort();
-                }
-            }
-            ji.close();
-        }
-    }
-
-    @Override
-    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
-            throws Exception {
-        NCApplicationContext appCtx;
-        synchronized (applications) {
-            if (applications.containsKey(appName)) {
-                throw new HyracksException("Duplicate application with name: " + appName + " being created.");
-            }
-            appCtx = new NCApplicationContext(serverCtx, ctx, appName, id);
-            applications.put(appName, appCtx);
-        }
-        if (deployHar) {
-            HttpClient hc = new DefaultHttpClient();
-            HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
-                    + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
-            HttpResponse response = hc.execute(get);
-            InputStream is = response.getEntity().getContent();
-            OutputStream os = appCtx.getHarOutputStream();
-            try {
-                IOUtils.copyLarge(is, os);
-            } finally {
-                os.close();
-                is.close();
-            }
-        }
-        appCtx.initializeClassPath();
-        appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
-        appCtx.initialize();
-    }
-
-    @Override
-    public void destroyApplication(String appName) throws Exception {
-        ApplicationContext appCtx = applications.remove(appName);
-        if (appCtx != null) {
-            appCtx.deinitialize();
-        }
-    }
-
-    @Override
-    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
-        Joblet ji = jobletMap.get(pid.getJobId());
-        if (ji != null) {
-            PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
-                    new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
-            ji.reportPartitionAvailability(channel);
-        }
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 6a73902..0cb938b 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -249,7 +249,11 @@
             joblet.notifyTaskComplete(this);
         } catch (Exception e) {
             e.printStackTrace();
-            joblet.notifyTaskFailed(this, e);
+            try {
+                joblet.notifyTaskFailed(this, e);
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
         } finally {
             ct.setName(threadName);
             close();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
new file mode 100644
index 0000000..6549a74
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class AbortTasksWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final JobId jobId;
+
+    private final List<TaskAttemptId> tasks;
+
+    public AbortTasksWork(NodeControllerService ncs, JobId jobId, List<TaskAttemptId> tasks) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.tasks = tasks;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
+        }
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+            for (TaskAttemptId taId : tasks) {
+                Task task = taskMap.get(taId);
+                if (task != null) {
+                    task.abort();
+                }
+            }
+            ji.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
new file mode 100644
index 0000000..50c05df
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class CleanupJobWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(CleanupJobWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final JobId jobId;
+
+    public CleanupJobWork(NodeControllerService ncs, JobId jobId) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Cleaning up after job: " + jobId);
+        }
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet joblet = jobletMap.remove(jobId);
+        if (joblet != null) {
+            ncs.getPartitionManager().unregisterPartitions(jobId);
+            joblet.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
new file mode 100644
index 0000000..120e376
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class CreateApplicationWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(CreateApplicationWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    private final boolean deployHar;
+
+    private final byte[] serializedDistributedState;
+
+    public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
+            byte[] serializedDistributedState) {
+        this.ncs = ncs;
+        this.appName = appName;
+        this.deployHar = deployHar;
+        this.serializedDistributedState = serializedDistributedState;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        NCApplicationContext appCtx;
+        Map<String, NCApplicationContext> applications = ncs.getApplications();
+        if (applications.containsKey(appName)) {
+            throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+        }
+        appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+        applications.put(appName, appCtx);
+        if (deployHar) {
+            NCConfig ncConfig = ncs.getConfiguration();
+            NodeParameters nodeParameters = ncs.getNodeParameters();
+            HttpClient hc = new DefaultHttpClient();
+            HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+                    + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+            HttpResponse response = hc.execute(get);
+            InputStream is = response.getEntity().getContent();
+            OutputStream os = appCtx.getHarOutputStream();
+            try {
+                IOUtils.copyLarge(is, os);
+            } finally {
+                os.close();
+                is.close();
+            }
+        }
+        appCtx.initializeClassPath();
+        appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+        appCtx.initialize();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
new file mode 100644
index 0000000..6a5fbfc
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class DestroyApplicationWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(DestroyApplicationWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+        this.ncs = ncs;
+        this.appName = appName;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        Map<String, NCApplicationContext> applications = ncs.getApplications();
+        ApplicationContext appCtx = applications.remove(appName);
+        if (appCtx != null) {
+            appCtx.deinitialize();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
new file mode 100644
index 0000000..bfa21c9
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+
+public class ReportPartitionAvailabilityWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(ReportPartitionAvailabilityWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final PartitionId pid;
+
+    private final NetworkAddress networkAddress;
+
+    public ReportPartitionAvailabilityWork(NodeControllerService ncs, PartitionId pid, NetworkAddress networkAddress) {
+        this.ncs = ncs;
+        this.pid = pid;
+        this.networkAddress = networkAddress;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet ji = jobletMap.get(pid.getJobId());
+        if (ji != null) {
+            PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
+                    ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+                            networkAddress.getPort()), pid, 1));
+            ji.reportPartitionAvailability(channel);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
new file mode 100644
index 0000000..ed83451
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+
+public class StartTasksWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    private final JobId jobId;
+
+    private final byte[] jagBytes;
+
+    private final List<TaskAttemptDescriptor> taskDescriptors;
+
+    private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
+
+    public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] jagBytes,
+            List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) {
+        this.ncs = ncs;
+        this.appName = appName;
+        this.jobId = jobId;
+        this.jagBytes = jagBytes;
+        this.taskDescriptors = taskDescriptors;
+        this.connectorPoliciesMap = connectorPoliciesMap;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            NCApplicationContext appCtx = applications.get(appName);
+            final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
+
+            IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+                @Override
+                public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+                    return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+                }
+
+                @Override
+                public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+                    return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+                }
+            };
+
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
+
+            for (TaskAttemptDescriptor td : taskDescriptors) {
+                TaskAttemptId taId = td.getTaskAttemptId();
+                TaskId tid = taId.getTaskId();
+                IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Initializing " + taId + " -> " + han);
+                }
+                final int partition = tid.getPartition();
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+                IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(),
+                        tid.getPartition());
+                IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition,
+                        td.getPartitionCount());
+
+                List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
+
+                List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
+                if (inputs != null) {
+                    for (int i = 0; i < inputs.size(); ++i) {
+                        IConnectorDescriptor conn = inputs.get(i);
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("input: " + i + ": " + conn.getConnectorId());
+                        }
+                        RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+                                recordDesc, cPolicy);
+                        collectors.add(collector);
+                    }
+                }
+                List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
+                if (outputs != null) {
+                    for (int i = 0; i < outputs.size(); ++i) {
+                        final IConnectorDescriptor conn = outputs.get(i);
+                        RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
+                                partition, taId);
+
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+                        }
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        operator.setOutputFrameWriter(i, writer, recordDesc);
+                    }
+                }
+
+                task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
+                joblet.addTask(task);
+
+                task.start();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet ji = jobletMap.get(jobId);
+        if (ji == null) {
+            ji = new Joblet(ncs, jobId, appCtx);
+            jobletMap.put(jobId, ji);
+        }
+        return ji;
+    }
+
+    private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+            int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+            throws HyracksDataException {
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
+        if (cPolicy.materializeOnReceiveSide()) {
+            return new ReceiveSideMaterializingCollector(ncs.getRootContext(), ncs.getPartitionManager(), collector,
+                    task.getTaskAttemptId(), ncs.getExecutor());
+        } else {
+            return collector;
+        }
+    }
+
+    private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
+            final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
+        if (cPolicy.materializeOnSendSide()) {
+            return new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
+                            new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                            ncs.getExecutor());
+                }
+            };
+        } else {
+            return new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
+                            conn.getConnectorId(), senderIndex, receiverIndex), taId);
+                }
+            };
+        }
+    }
+}
\ No newline at end of file