Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
new file mode 100644
index 0000000..c92ff27
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -0,0 +1,53 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-control-nc</artifactId>
+  <name>hyracks-control-nc</name>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-control</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.dcache</groupId>
+  		<artifactId>dcache-client</artifactId>
+  		<version>0.0.1</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-control-common</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-net</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  	</dependency>
+  </dependencies>
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <version>2.0.1</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
new file mode 100644
index 0000000..3855b4d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class Joblet implements IHyracksJobletContext, ICounterContext {
+    private final NodeControllerService nodeController;
+
+    private final INCApplicationContext appCtx;
+
+    private final JobId jobId;
+
+    private final ActivityClusterGraph acg;
+
+    private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
+
+    private final IOperatorEnvironment env;
+
+    private final Map<Object, IStateObject> stateObjectMap;
+
+    private final Map<TaskAttemptId, Task> taskMap;
+
+    private final Map<String, Counter> counterMap;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    private final Object globalJobData;
+
+    private final IJobletEventListener jobletEventListener;
+
+    private final int frameSize;
+
+    private JobStatus cleanupStatus;
+
+    private boolean cleanupPending;
+
+    public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
+            ActivityClusterGraph acg) {
+        this.nodeController = nodeController;
+        this.appCtx = appCtx;
+        this.jobId = jobId;
+        this.frameSize = acg.getFrameSize();
+        this.acg = acg;
+        partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
+        env = new OperatorEnvironmentImpl(nodeController.getId());
+        stateObjectMap = new HashMap<Object, IStateObject>();
+        taskMap = new HashMap<TaskAttemptId, Task>();
+        counterMap = new HashMap<String, Counter>();
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+        cleanupPending = false;
+        IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
+        if (jelf != null) {
+            IJobletEventListener listener = jelf.createListener(this);
+            this.jobletEventListener = listener;
+            listener.jobletStart();
+        } else {
+            jobletEventListener = null;
+        }
+        IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
+        globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph() {
+        return acg;
+    }
+
+    public IOperatorEnvironment getEnvironment() {
+        return env;
+    }
+
+    public void addTask(Task task) {
+        taskMap.put(task.getTaskAttemptId(), task);
+    }
+
+    public void removeTask(Task task) {
+        taskMap.remove(task.getTaskAttemptId());
+        if (cleanupPending && taskMap.isEmpty()) {
+            performCleanup();
+        }
+    }
+
+    public Map<TaskAttemptId, Task> getTaskMap() {
+        return taskMap;
+    }
+
+    private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+        private final String nodeId;
+
+        public OperatorEnvironmentImpl(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public String toString() {
+            return super.toString() + "@" + nodeId;
+        }
+
+        @Override
+        public synchronized void setStateObject(IStateObject taskState) {
+            stateObjectMap.put(taskState.getId(), taskState);
+        }
+
+        @Override
+        public synchronized IStateObject getStateObject(Object id) {
+            return stateObjectMap.get(id);
+        }
+    }
+
+    public NodeControllerService getNodeController() {
+        return nodeController;
+    }
+
+    public void dumpProfile(JobletProfile jProfile) {
+        Map<String, Long> counters = jProfile.getCounters();
+        for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+            counters.put(e.getKey(), e.getValue().get());
+        }
+        for (Task task : taskMap.values()) {
+            TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
+                    new Hashtable<PartitionId, PartitionProfile>(task.getPartitionSendProfile()));
+            task.dumpProfile(taskProfile);
+            jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
+        }
+    }
+
+    @Override
+    public INCApplicationContext getApplicationContext() {
+        return appCtx;
+    }
+
+    @Override
+    public ICounterContext getCounterContext() {
+        return this;
+    }
+
+    @Override
+    public void registerDeallocatable(IDeallocatable deallocatable) {
+        deallocatableRegistry.registerDeallocatable(deallocatable);
+    }
+
+    public void close() {
+        nodeController.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                deallocatableRegistry.close();
+            }
+        });
+    }
+
+    ByteBuffer allocateFrame() {
+        return ByteBuffer.allocate(getFrameSize());
+    }
+
+    int getFrameSize() {
+        return frameSize;
+    }
+
+    IIOManager getIOManager() {
+        return appCtx.getRootContext().getIOManager();
+    }
+
+    @Override
+    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return fileFactory.createManagedWorkspaceFile(prefix);
+    }
+
+    @Override
+    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return fileFactory.createUnmanagedWorkspaceFile(prefix);
+    }
+
+    @Override
+    public synchronized ICounter getCounter(String name, boolean create) {
+        Counter counter = counterMap.get(name);
+        if (counter == null && create) {
+            counter = new Counter(name);
+            counterMap.put(name, counter);
+        }
+        return counter;
+    }
+
+    @Override
+    public Object getGlobalJobData() {
+        return globalJobData;
+    }
+
+    public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
+            IPartitionCollector collector, PartitionState minState) throws Exception {
+        for (PartitionId pid : pids) {
+            partitionRequestMap.put(pid, collector);
+            PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
+            nodeController.getClusterController().registerPartitionRequest(req);
+        }
+    }
+
+    public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
+        IPartitionCollector collector = partitionRequestMap.get(channel.getPartitionId());
+        if (collector != null) {
+            collector.addPartitions(Collections.singleton(channel));
+        }
+    }
+
+    public IJobletEventListener getJobletEventListener() {
+        return jobletEventListener;
+    }
+
+    public void cleanup(JobStatus status) {
+        cleanupStatus = status;
+        cleanupPending = true;
+        if (taskMap.isEmpty()) {
+            performCleanup();
+        }
+    }
+
+    private void performCleanup() {
+        nodeController.getJobletMap().remove(jobId);
+        IJobletEventListener listener = getJobletEventListener();
+        if (listener != null) {
+            listener.jobletFinish(cleanupStatus);
+        }
+        close();
+        cleanupPending = false;
+        try {
+            nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
new file mode 100644
index 0000000..dde7abc
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+
+public class NCDriver {
+    public static void main(String args[]) throws Exception {
+        NCConfig ncConfig = new NCConfig();
+        CmdLineParser cp = new CmdLineParser(ncConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+
+        DCacheClientConfig dccConfig = new DCacheClientConfig();
+        dccConfig.servers = ncConfig.dcacheClientServers;
+        dccConfig.serverLocal = ncConfig.dcacheClientServerLocal;
+        dccConfig.path = ncConfig.dcacheClientPath;
+
+        DCacheClient.get().init(dccConfig);
+
+        final NodeControllerService nService = new NodeControllerService(ncConfig);
+        nService.start();
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    nService.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        while (true) {
+            Thread.sleep(10000);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
new file mode 100644
index 0000000..0195143
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -0,0 +1,465 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.File;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.WorkQueue;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
+import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
+import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
+import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NodeControllerService extends AbstractRemoteService {
+    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+    private NCConfig ncConfig;
+
+    private final String id;
+
+    private final IHyracksRootContext ctx;
+
+    private final IPCSystem ipc;
+
+    private final PartitionManager partitionManager;
+
+    private final NetworkManager netManager;
+
+    private final WorkQueue queue;
+
+    private final Timer timer;
+
+    private boolean registrationPending;
+
+    private Exception registrationException;
+
+    private IClusterController ccs;
+
+    private final Map<JobId, Joblet> jobletMap;
+
+    private final ExecutorService executor;
+
+    private NodeParameters nodeParameters;
+
+    private HeartbeatTask heartbeatTask;
+
+    private final ServerContext serverCtx;
+
+    private final Map<String, NCApplicationContext> applications;
+
+    private final MemoryMXBean memoryMXBean;
+
+    private final List<GarbageCollectorMXBean> gcMXBeans;
+
+    private final ThreadMXBean threadMXBean;
+
+    private final RuntimeMXBean runtimeMXBean;
+
+    private final OperatingSystemMXBean osMXBean;
+
+    private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
+
+    public NodeControllerService(NCConfig ncConfig) throws Exception {
+        this.ncConfig = ncConfig;
+        id = ncConfig.nodeId;
+        executor = Executors.newCachedThreadPool();
+        NodeControllerIPCI ipci = new NodeControllerIPCI();
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+                new CCNCFunctions.SerializerDeserializer());
+        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+        if (id == null) {
+            throw new Exception("id not set");
+        }
+        partitionManager = new PartitionManager(this);
+        netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+
+        queue = new WorkQueue();
+        jobletMap = new Hashtable<JobId, Joblet>();
+        timer = new Timer(true);
+        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
+                NodeControllerService.class.getName()), id));
+        applications = new Hashtable<String, NCApplicationContext>();
+        memoryMXBean = ManagementFactory.getMemoryMXBean();
+        gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+        threadMXBean = ManagementFactory.getThreadMXBean();
+        runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+        osMXBean = ManagementFactory.getOperatingSystemMXBean();
+        registrationPending = true;
+        getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+    }
+
+    public IHyracksRootContext getRootContext() {
+        return ctx;
+    }
+
+    private static List<IODeviceHandle> getDevices(String ioDevices) {
+        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+        StringTokenizer tok = new StringTokenizer(ioDevices, ",");
+        while (tok.hasMoreElements()) {
+            String devPath = tok.nextToken().trim();
+            devices.add(new IODeviceHandle(new File(devPath), "."));
+        }
+        return devices;
+    }
+
+    private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+        this.nodeParameters = parameters;
+        this.registrationException = exception;
+        this.registrationPending = false;
+        notifyAll();
+    }
+
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+        synchronized (getNodeControllerInfosAcceptor) {
+            while (getNodeControllerInfosAcceptor.getValue() != null) {
+                getNodeControllerInfosAcceptor.wait();
+            }
+            getNodeControllerInfosAcceptor.setValue(fv);
+        }
+        ccs.getNodeControllerInfos();
+        return fv.get();
+    }
+
+    private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+        FutureValue<Map<String, NodeControllerInfo>> fv;
+        synchronized (getNodeControllerInfosAcceptor) {
+            fv = getNodeControllerInfosAcceptor.getValue();
+            getNodeControllerInfosAcceptor.setValue(null);
+            getNodeControllerInfosAcceptor.notifyAll();
+        }
+        fv.setValue(ncInfos);
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        ipc.start();
+        netManager.start();
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+        for (int i = 0; i < gcInfos.length; ++i) {
+            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+        }
+        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+                        .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+
+        synchronized (this) {
+            while (registrationPending) {
+                wait();
+            }
+        }
+        if (registrationException != null) {
+            throw registrationException;
+        }
+
+        queue.start();
+
+        heartbeatTask = new HeartbeatTask(ccs);
+
+        // Schedule heartbeat generator.
+        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
+
+        if (nodeParameters.getProfileDumpPeriod() > 0) {
+            // Schedule profile dump generator.
+            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+        }
+
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+        executor.shutdownNow();
+        partitionManager.close();
+        heartbeatTask.cancel();
+        netManager.stop();
+        queue.stop();
+        LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public ServerContext getServerContext() {
+        return serverCtx;
+    }
+
+    public Map<String, NCApplicationContext> getApplications() {
+        return applications;
+    }
+
+    public Map<JobId, Joblet> getJobletMap() {
+        return jobletMap;
+    }
+
+    public NetworkManager getNetworkManager() {
+        return netManager;
+    }
+
+    public PartitionManager getPartitionManager() {
+        return partitionManager;
+    }
+
+    public IClusterController getClusterController() {
+        return ccs;
+    }
+
+    public NodeParameters getNodeParameters() {
+        return nodeParameters;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public NCConfig getConfiguration() {
+        return ncConfig;
+    }
+
+    public WorkQueue getWorkQueue() {
+        return queue;
+    }
+
+    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+        String ipaddrStr = ncConfig.dataIPAddress;
+        ipaddrStr = ipaddrStr.trim();
+        Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+        Matcher m = pattern.matcher(ipaddrStr);
+        if (!m.matches()) {
+            throw new Exception(MessageFormat.format(
+                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+        }
+        byte[] ipBytes = new byte[4];
+        ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+        ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+        ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+        ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+        return InetAddress.getByAddress(ipBytes);
+    }
+
+    private class HeartbeatTask extends TimerTask {
+        private IClusterController cc;
+
+        private final HeartbeatData hbData;
+
+        public HeartbeatTask(IClusterController cc) {
+            this.cc = cc;
+            hbData = new HeartbeatData();
+            hbData.gcCollectionCounts = new long[gcMXBeans.size()];
+            hbData.gcCollectionTimes = new long[gcMXBeans.size()];
+        }
+
+        @Override
+        public void run() {
+            MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+            hbData.heapInitSize = heapUsage.getInit();
+            hbData.heapUsedSize = heapUsage.getUsed();
+            hbData.heapCommittedSize = heapUsage.getCommitted();
+            hbData.heapMaxSize = heapUsage.getMax();
+            MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
+            hbData.nonheapInitSize = nonheapUsage.getInit();
+            hbData.nonheapUsedSize = nonheapUsage.getUsed();
+            hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+            hbData.nonheapMaxSize = nonheapUsage.getMax();
+            hbData.threadCount = threadMXBean.getThreadCount();
+            hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+            hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
+            hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+            int gcN = gcMXBeans.size();
+            for (int i = 0; i < gcN; ++i) {
+                GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+                hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
+                hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+            }
+
+            MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
+            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+            IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
+            try {
+                cc.nodeHeartbeat(id, hbData);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class ProfileDumpTask extends TimerTask {
+        private IClusterController cc;
+
+        public ProfileDumpTask(IClusterController cc) {
+            this.cc = cc;
+        }
+
+        @Override
+        public void run() {
+            try {
+                FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+                queue.scheduleAndSync(bjpw);
+                List<JobProfile> profiles = fv.get();
+                if (!profiles.isEmpty()) {
+                    cc.reportProfile(id, profiles);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private final class NodeControllerIPCI implements IIPCI {
+        @Override
+        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+            CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+            switch (fn.getFunctionId()) {
+                case SEND_APPLICATION_MESSAGE: {
+                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+                            .getAppName(), amf.getNodeId()));
+                    return;
+                }
+                case START_TASKS: {
+                    CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
+                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                    return;
+                }
+
+                case ABORT_TASKS: {
+                    CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+                    queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+                    return;
+                }
+
+                case CLEANUP_JOBLET: {
+                    CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+                    queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+                    return;
+                }
+
+                case CREATE_APPLICATION: {
+                    CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
+                    queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
+                            .isDeployHar(), caf.getSerializedDistributedState()));
+                    return;
+                }
+
+                case DESTROY_APPLICATION: {
+                    CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
+                    queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
+                    return;
+                }
+
+                case REPORT_PARTITION_AVAILABILITY: {
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+                            .getPartitionId(), rpaf.getNetworkAddress()));
+                    return;
+                }
+
+                case NODE_REGISTRATION_RESULT: {
+                    CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+                    setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+                    return;
+                }
+
+                case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    setNodeControllersInfo(gncirf.getNodeControllerInfos());
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+        }
+    }
+
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+        ccs.sendApplicationMessageToCC(data, appName, nodeId);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
new file mode 100644
index 0000000..eba3ec9
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork;
+
+public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+    private final Joblet joblet;
+
+    private final TaskAttemptId taskAttemptId;
+
+    private final String displayName;
+
+    private final Executor executor;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final Map<String, Counter> counterMap;
+
+    private final IOperatorEnvironment opEnv;
+
+    private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+    private final Set<Thread> pendingThreads;
+
+    private IPartitionCollector[] collectors;
+
+    private IOperatorNodePushable operator;
+
+    private volatile boolean failed;
+
+    private ByteArrayOutputStream errorBaos;
+
+    private PrintWriter errorWriter;
+
+    private volatile boolean aborted;
+
+    private NodeControllerService ncs;
+
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
+        this.joblet = joblet;
+        this.taskAttemptId = taskId;
+        this.displayName = displayName;
+        this.executor = executor;
+        fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        counterMap = new HashMap<String, Counter>();
+        opEnv = joblet.getEnvironment();
+        partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+        pendingThreads = new LinkedHashSet<Thread>();
+        failed = false;
+        errorBaos = new ByteArrayOutputStream();
+        errorWriter = new PrintWriter(errorBaos, true);
+        this.ncs = ncs;
+    }
+
+    public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
+        this.collectors = collectors;
+        this.operator = operator;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame() {
+        return joblet.allocateFrame();
+    }
+
+    @Override
+    public int getFrameSize() {
+        return joblet.getFrameSize();
+    }
+
+    @Override
+    public IIOManager getIOManager() {
+        return joblet.getIOManager();
+    }
+
+    @Override
+    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return fileFactory.createUnmanagedWorkspaceFile(prefix);
+    }
+
+    @Override
+    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return fileFactory.createManagedWorkspaceFile(prefix);
+    }
+
+    @Override
+    public void registerDeallocatable(IDeallocatable deallocatable) {
+        deallocatableRegistry.registerDeallocatable(deallocatable);
+    }
+
+    public void close() {
+        deallocatableRegistry.close();
+    }
+
+    @Override
+    public IHyracksJobletContext getJobletContext() {
+        return joblet;
+    }
+
+    @Override
+    public TaskAttemptId getTaskAttemptId() {
+        return taskAttemptId;
+    }
+
+    @Override
+    public ICounter getCounter(String name, boolean create) {
+        Counter counter = counterMap.get(name);
+        if (counter == null && create) {
+            counter = new Counter(name);
+            counterMap.put(name, counter);
+        }
+        return counter;
+    }
+
+    @Override
+    public ICounterContext getCounterContext() {
+        return this;
+    }
+
+    public Joblet getJoblet() {
+        return joblet;
+    }
+
+    public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+        return partitionSendProfile;
+    }
+
+    public synchronized void dumpProfile(TaskProfile tProfile) {
+        Map<String, Long> dumpMap = tProfile.getCounters();
+        for (Counter c : counterMap.values()) {
+            dumpMap.put(c.getName(), c.get());
+        }
+    }
+
+    public void setPartitionSendProfile(PartitionProfile profile) {
+        partitionSendProfile.put(profile.getPartitionId(), profile);
+    }
+
+    public void start() throws HyracksException {
+        aborted = false;
+        executor.execute(this);
+    }
+
+    public synchronized void abort() {
+        aborted = true;
+        for (IPartitionCollector c : collectors) {
+            c.abort();
+        }
+        for (Thread t : pendingThreads) {
+            t.interrupt();
+        }
+    }
+
+    private synchronized void addPendingThread(Thread t) {
+        pendingThreads.add(t);
+    }
+
+    private synchronized void removePendingThread(Thread t) {
+        pendingThreads.remove(t);
+        if (pendingThreads.isEmpty()) {
+            notifyAll();
+        }
+    }
+
+    public synchronized void waitForCompletion() throws InterruptedException {
+        while (!pendingThreads.isEmpty()) {
+            wait();
+        }
+    }
+
+    @Override
+    public void run() {
+        Thread ct = Thread.currentThread();
+        String threadName = ct.getName();
+        addPendingThread(ct);
+        try {
+            ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+            operator.initialize();
+            try {
+                if (collectors.length > 0) {
+                    final Semaphore sem = new Semaphore(collectors.length - 1);
+                    for (int i = 1; i < collectors.length; ++i) {
+                        final IPartitionCollector collector = collectors[i];
+                        final IFrameWriter writer = operator.getInputFrameWriter(i);
+                        sem.acquire();
+                        final int cIdx = i;
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                if (aborted) {
+                                    return;
+                                }
+                                Thread thread = Thread.currentThread();
+                                addPendingThread(thread);
+                                String oldName = thread.getName();
+                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                try {
+                                    pushFrames(collector, writer);
+                                } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        failed = true;
+                                        errorWriter.println("Exception caught by thread: " + thread.getName());
+                                        e.printStackTrace(errorWriter);
+                                        errorWriter.println();
+                                    }
+                                } finally {
+                                    thread.setName(oldName);
+                                    sem.release();
+                                    removePendingThread(thread);
+                                }
+                            }
+                        });
+                    }
+                    try {
+                        pushFrames(collectors[0], operator.getInputFrameWriter(0));
+                    } finally {
+                        sem.acquire(collectors.length - 1);
+                    }
+                }
+            } finally {
+                operator.deinitialize();
+            }
+            NodeControllerService ncs = joblet.getNodeController();
+            ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+        } catch (Exception e) {
+            failed = true;
+            errorWriter.println("Exception caught by thread: " + ct.getName());
+            e.printStackTrace(errorWriter);
+            errorWriter.println();
+        } finally {
+            ct.setName(threadName);
+            close();
+            removePendingThread(ct);
+        }
+        if (failed) {
+            errorWriter.close();
+            NodeControllerService ncs = joblet.getNodeController();
+            try {
+                ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+        if (aborted) {
+            return;
+        }
+        try {
+            collector.open();
+            try {
+                joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+                        PartitionState.STARTED);
+                IFrameReader reader = collector.getReader();
+                reader.open();
+                try {
+                    writer.open();
+                    try {
+                        ByteBuffer buffer = allocateFrame();
+                        while (reader.nextFrame(buffer)) {
+                            if (aborted) {
+                                return;
+                            }
+                            buffer.flip();
+                            writer.nextFrame(buffer);
+                            buffer.compact();
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw e;
+                    } finally {
+                        writer.close();
+                    }
+                } finally {
+                    reader.close();
+                }
+            } finally {
+                collector.close();
+            }
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void setStateObject(IStateObject taskState) {
+        opEnv.setStateObject(taskState);
+    }
+
+    @Override
+    public IStateObject getStateObject(Object id) {
+        return opEnv.getStateObject(id);
+    }
+
+    @Override
+    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+        this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
+                .getApplicationName(), nodeId);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
new file mode 100644
index 0000000..1121c6c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.hyracks.control.nc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+    private final String nodeId;
+    private final IHyracksRootContext rootCtx;
+    private Object appObject;
+
+    public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName, String nodeId)
+            throws IOException {
+        super(serverCtx, appName);
+        this.nodeId = nodeId;
+        this.rootCtx = rootCtx;
+    }
+
+    @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setDistributedState(Serializable state) {
+        distributedState = state;
+    }
+
+    @Override
+    protected void start() throws Exception {
+        ((INCBootstrap) bootstrap).setApplicationContext(this);
+        bootstrap.start();
+    }
+
+    @Override
+    protected void stop() throws Exception {
+        if (bootstrap != null) {
+            bootstrap.stop();
+        }
+    }
+
+    @Override
+    public IHyracksRootContext getRootContext() {
+        return rootCtx;
+    }
+
+    @Override
+    public void setApplicationObject(Object object) {
+        this.appObject = object;
+    }
+
+    @Override
+    public Object getApplicationObject() {
+        return appObject;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
new file mode 100644
index 0000000..8916fb1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public class FileHandle implements IFileHandle {
+    private final FileReference fileRef;
+
+    private RandomAccessFile raf;
+
+    private FileChannel channel;
+
+    public FileHandle(FileReference fileRef) {
+        this.fileRef = fileRef;
+    }
+
+    public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
+        String mode;
+        switch (rwMode) {
+            case READ_ONLY:
+                mode = "r";
+                break;
+
+            case READ_WRITE:
+                fileRef.getFile().getAbsoluteFile().getParentFile().mkdirs();
+                switch (syncMode) {
+                    case METADATA_ASYNC_DATA_ASYNC:
+                        mode = "rw";
+                        break;
+
+                    case METADATA_ASYNC_DATA_SYNC:
+                        mode = "rwd";
+                        break;
+
+                    case METADATA_SYNC_DATA_SYNC:
+                        mode = "rws";
+                        break;
+
+                    default:
+                        throw new IllegalArgumentException();
+                }
+                break;
+
+            default:
+                throw new IllegalArgumentException();
+        }
+        raf = new RandomAccessFile(fileRef.getFile(), mode);
+        channel = raf.getChannel();
+    }
+
+    public void close() throws IOException {
+        channel.close();
+        raf.close();
+    }
+
+    public FileReference getFileReference() {
+        return fileRef;
+    }
+
+    public RandomAccessFile getRandomAccessFile() {
+        return raf;
+    }
+
+    public FileChannel getFileChannel() {
+        return channel;
+    }
+
+    public void sync(boolean metadata) throws IOException {
+        channel.force(metadata);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
new file mode 100644
index 0000000..3b13f32
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOFuture;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+
+public class IOManager implements IIOManager {
+    private final List<IODeviceHandle> ioDevices;
+
+    private final Executor executor;
+
+    private final List<IODeviceHandle> workAreaIODevices;
+
+    private int workAreaDeviceIndex;
+
+    public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
+        this.ioDevices = Collections.unmodifiableList(devices);
+        this.executor = executor;
+        workAreaIODevices = new ArrayList<IODeviceHandle>();
+        for (IODeviceHandle d : ioDevices) {
+            if (d.getWorkAreaPath() != null) {
+                new File(d.getPath(), d.getWorkAreaPath()).mkdirs();
+                workAreaIODevices.add(d);
+            }
+        }
+        if (workAreaIODevices.isEmpty()) {
+            throw new HyracksException("No devices with work areas found");
+        }
+        workAreaDeviceIndex = 0;
+    }
+
+    @Override
+    public List<IODeviceHandle> getIODevices() {
+        return ioDevices;
+    }
+
+    @Override
+    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException {
+        FileHandle fHandle = new FileHandle(fileRef);
+        try {
+            fHandle.open(rwMode, syncMode);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return fHandle;
+    }
+
+    @Override
+    public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        try {
+            int n = 0;
+            int remaining = data.remaining();
+            while (remaining > 0) {
+                int len = ((FileHandle) fHandle).getFileChannel().write(data, offset);
+                if (len < 0) {
+                    throw new HyracksDataException("Error writing to file: "
+                            + ((FileHandle) fHandle).getFileReference().toString());
+                }
+                remaining -= len;
+                offset += len;
+                n += len;
+            }
+            return n;
+        } catch (HyracksDataException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        try {
+            int n = 0;
+            int remaining = data.remaining();
+            while (remaining > 0) {
+                int len = ((FileHandle) fHandle).getFileChannel().read(data, offset);
+                if (len < 0) {
+                    return -1;
+                }
+                remaining -= len;
+                offset += len;
+                n += len;
+            }
+            return n;
+        } catch (HyracksDataException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) {
+        AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data);
+        executor.execute(req);
+        return req;
+    }
+
+    @Override
+    public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) {
+        AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data);
+        executor.execute(req);
+        return req;
+    }
+
+    @Override
+    public void close(IFileHandle fHandle) throws HyracksDataException {
+        try {
+            ((FileHandle) fHandle).close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public synchronized FileReference createWorkspaceFile(String prefix) throws HyracksDataException {
+        IODeviceHandle dev = workAreaIODevices.get(workAreaDeviceIndex);
+        workAreaDeviceIndex = (workAreaDeviceIndex + 1) % workAreaIODevices.size();
+        String waPath = dev.getWorkAreaPath();
+        File waf;
+        try {
+            waf = File.createTempFile(prefix, ".waf", new File(dev.getPath(), waPath));
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return dev.createFileReference(waPath + File.separator + waf.getName());
+    }
+
+    private abstract class AsyncRequest implements IIOFuture, Runnable {
+        protected final FileHandle fHandle;
+        protected final long offset;
+        protected final ByteBuffer data;
+        private boolean complete;
+        private HyracksDataException exception;
+        private int result;
+
+        private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+            this.fHandle = fHandle;
+            this.offset = offset;
+            this.data = data;
+            complete = false;
+            exception = null;
+        }
+
+        @Override
+        public void run() {
+            HyracksDataException hde = null;
+            int res = -1;
+            try {
+                res = performOperation();
+            } catch (HyracksDataException e) {
+                hde = e;
+            }
+            synchronized (this) {
+                exception = hde;
+                result = res;
+                complete = true;
+                notifyAll();
+            }
+        }
+
+        protected abstract int performOperation() throws HyracksDataException;
+
+        @Override
+        public synchronized int synchronize() throws HyracksDataException, InterruptedException {
+            while (!complete) {
+                wait();
+            }
+            if (exception != null) {
+                throw exception;
+            }
+            return result;
+        }
+
+        @Override
+        public synchronized boolean isComplete() {
+            return complete;
+        }
+    }
+
+    private class AsyncReadRequest extends AsyncRequest {
+        private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+            super(fHandle, offset, data);
+        }
+
+        @Override
+        protected int performOperation() throws HyracksDataException {
+            return syncRead(fHandle, offset, data);
+        }
+    }
+
+    private class AsyncWriteRequest extends AsyncRequest {
+        private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+            super(fHandle, offset, data);
+        }
+
+        @Override
+        protected int performOperation() throws HyracksDataException {
+            return syncWrite(fHandle, offset, data);
+        }
+    }
+
+    @Override
+    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+        try {
+            ((FileHandle) fileHandle).sync(metadata);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
new file mode 100644
index 0000000..16421ab
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public final class WorkspaceFileFactory implements IWorkspaceFileFactory {
+    private final IDeallocatableRegistry registry;
+    private final IOManager ioManager;
+
+    public WorkspaceFileFactory(IDeallocatableRegistry registry, IOManager ioManager) {
+        this.registry = registry;
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+        final FileReference fRef = ioManager.createWorkspaceFile(prefix);
+        registry.registerDeallocatable(new IDeallocatable() {
+            @Override
+            public void deallocate() {
+                fRef.delete();
+            }
+        });
+        return fRef;
+    }
+
+    @Override
+    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return ioManager.createWorkspaceFile(prefix);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
new file mode 100644
index 0000000..1d5af84
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+    private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+    private final NetworkManager netManager;
+
+    private final SocketAddress remoteAddress;
+
+    private final PartitionId partitionId;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final int nBuffers;
+
+    private ChannelControlBlock ccb;
+
+    private IInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+            int nBuffers) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.partitionId = partitionId;
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.nBuffers = nBuffers;
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(partitionId.getJobId().getId());
+        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getSenderIndex());
+        writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.flip();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+        }
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(NetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(NetworkInputChannel.this);
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..b805595
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NetworkManager {
+    private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
+
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final PartitionManager partitionManager;
+
+    private final MuxDemux md;
+
+    private NetworkAddress networkAddress;
+
+    public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+        this.partitionManager = partitionManager;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return networkAddress;
+    }
+
+    public void stop() {
+
+    }
+
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        private NetworkOutputChannel noc;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            PartitionId pid = readInitialMessage(buffer);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
+            }
+            noc = new NetworkOutputChannel(ccb, 1);
+            try {
+                partitionManager.registerPartitionRequest(pid, noc);
+            } catch (HyracksException e) {
+                noc.abort();
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+            if (noc != null) {
+                noc.abort();
+            }
+        }
+    }
+
+    private static PartitionId readInitialMessage(ByteBuffer buffer) {
+        JobId jobId = new JobId(buffer.getLong());
+        ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+        int senderIndex = buffer.getInt();
+        int receiverIndex = buffer.getInt();
+        return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
new file mode 100644
index 0000000..9024e18
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+    private final ChannelControlBlock ccb;
+
+    private final int nBuffers;
+
+    private final Deque<ByteBuffer> emptyStack;
+
+    private boolean aborted;
+
+    public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+        this.ccb = ccb;
+        this.nBuffers = nBuffers;
+        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+    }
+
+    public void setTaskContext(IHyracksTaskContext ctx) {
+        for (int i = 0; i < nBuffers; ++i) {
+            emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        ByteBuffer destBuffer = null;
+        synchronized (this) {
+            while (true) {
+                if (aborted) {
+                    throw new HyracksDataException("Connection has been aborted");
+                }
+                destBuffer = emptyStack.poll();
+                if (destBuffer != null) {
+                    break;
+                }
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+        buffer.position(0);
+        buffer.limit(destBuffer.capacity());
+        destBuffer.clear();
+        destBuffer.put(buffer);
+        destBuffer.flip();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    void abort() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        synchronized (NetworkOutputChannel.this) {
+            aborted = true;
+            NetworkOutputChannel.this.notifyAll();
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            synchronized (NetworkOutputChannel.this) {
+                emptyStack.push(buffer);
+                NetworkOutputChannel.this.notifyAll();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
new file mode 100644
index 0000000..ae4fd2b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartition implements IPartition {
+    private final IHyracksTaskContext ctx;
+
+    private final FileReference partitionFile;
+
+    private final Executor executor;
+
+    private final IOManager ioManager;
+
+    public MaterializedPartition(IHyracksTaskContext ctx, FileReference partitionFile, Executor executor,
+            IOManager ioManager) {
+        this.ctx = ctx;
+        this.partitionFile = partitionFile;
+        this.executor = executor;
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
+    public void deallocate() {
+        partitionFile.delete();
+    }
+
+    @Override
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    try {
+                        writer.open();
+                        try {
+                            long offset = 0;
+                            ByteBuffer buffer = ctx.allocateFrame();
+                            while (true) {
+                                buffer.clear();
+                                long size = ioManager.syncRead(fh, offset, buffer);
+                                if (size < 0) {
+                                    break;
+                                } else if (size < buffer.capacity()) {
+                                    throw new HyracksDataException("Premature end of file");
+                                }
+                                offset += size;
+                                buffer.flip();
+                                writer.nextFrame(buffer);
+                            }
+                        } finally {
+                            writer.close();
+                        }
+                    } finally {
+                        ioManager.close(fh);
+                    }
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public boolean isReusable() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
new file mode 100644
index 0000000..16e31f7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class MaterializedPartitionInputChannel implements IInputChannel {
+    private final int nBuffers;
+
+    private final Queue<ByteBuffer> emptyQueue;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final PartitionId pid;
+
+    private final PartitionManager manager;
+
+    private final FrameWriter writer;
+
+    private IInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public MaterializedPartitionInputChannel(int nBuffers, PartitionId pid, PartitionManager manager) {
+        this.nBuffers = nBuffers;
+        this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.pid = pid;
+        this.manager = manager;
+        writer = new FrameWriter();
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        synchronized (this) {
+            emptyQueue.add(buffer);
+            notifyAll();
+        }
+    }
+
+    @Override
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+        for (int i = 0; i < nBuffers; ++i) {
+            emptyQueue.add(ctx.allocateFrame());
+        }
+        IPartition partition = manager.getPartition(pid);
+        partition.writeTo(writer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class FrameWriter implements IFrameWriter {
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            synchronized (MaterializedPartitionInputChannel.this) {
+                while (emptyQueue.isEmpty()) {
+                    try {
+                        MaterializedPartitionInputChannel.this.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                ByteBuffer destFrame = emptyQueue.poll();
+                buffer.position(0);
+                buffer.limit(buffer.capacity());
+                destFrame.clear();
+                destFrame.put(buffer);
+                fullQueue.add(destFrame);
+                monitor.notifyDataAvailability(MaterializedPartitionInputChannel.this, 1);
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
new file mode 100644
index 0000000..7bd0eb1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartitionWriter implements IFrameWriter {
+    private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
+
+    private final IHyracksTaskContext ctx;
+
+    private final PartitionManager manager;
+
+    private final PartitionId pid;
+
+    private final TaskAttemptId taId;
+
+    private final Executor executor;
+
+    private FileReference fRef;
+
+    private IFileHandle handle;
+
+    private long size;
+
+    private boolean failed;
+
+    public MaterializedPartitionWriter(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
+            TaskAttemptId taId, Executor executor) {
+        this.ctx = ctx;
+        this.manager = manager;
+        this.pid = pid;
+        this.taId = taId;
+        this.executor = executor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("open(" + pid + " by " + taId);
+        }
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        size = 0;
+        failed = false;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        size += ctx.getIOManager().syncWrite(handle, size, buffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("close(" + pid + " by " + taId);
+        }
+        ctx.getIOManager().close(handle);
+        if (!failed) {
+            manager.registerPartition(pid, taId,
+                    new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+                    PartitionState.COMMITTED);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
new file mode 100644
index 0000000..62320c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
+    private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
+
+    private final IHyracksTaskContext ctx;
+
+    private final Executor executor;
+
+    private final IOManager ioManager;
+
+    private final PartitionManager manager;
+
+    private final PartitionId pid;
+
+    private final TaskAttemptId taId;
+
+    private FileReference fRef;
+
+    private IFileHandle handle;
+
+    private long size;
+
+    private boolean eos;
+
+    private boolean failed;
+
+    public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
+            TaskAttemptId taId, Executor executor) {
+        this.ctx = ctx;
+        this.executor = executor;
+        this.ioManager = (IOManager) ctx.getIOManager();
+        this.manager = manager;
+        this.pid = pid;
+        this.taId = taId;
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
+    public void deallocate() {
+        fRef.delete();
+    }
+
+    @Override
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    IFileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    try {
+                        writer.open();
+                        try {
+                            long offset = 0;
+                            ByteBuffer buffer = ctx.allocateFrame();
+                            boolean fail = false;
+                            boolean done = false;
+                            while (!fail && !done) {
+                                synchronized (MaterializingPipelinedPartition.this) {
+                                    while (offset >= size && !eos && !failed) {
+                                        try {
+                                            MaterializingPipelinedPartition.this.wait();
+                                        } catch (InterruptedException e) {
+                                            throw new HyracksDataException(e);
+                                        }
+                                    }
+                                    fail = failed;
+                                    done = eos && offset >= size;
+                                }
+                                if (fail) {
+                                    writer.fail();
+                                } else if (!done) {
+                                    buffer.clear();
+                                    long readLen = ioManager.syncRead(fh, offset, buffer);
+                                    if (readLen < buffer.capacity()) {
+                                        throw new HyracksDataException("Premature end of file");
+                                    }
+                                    offset += readLen;
+                                    buffer.flip();
+                                    writer.nextFrame(buffer);
+                                }
+                            }
+                        } finally {
+                            writer.close();
+                        }
+                    } finally {
+                        ioManager.close(fh);
+                    }
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public boolean isReusable() {
+        return true;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("open(" + pid + " by " + taId);
+        }
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        size = 0;
+        eos = false;
+        failed = false;
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+    }
+
+    @Override
+    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        size += ctx.getIOManager().syncWrite(handle, size, buffer);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void fail() throws HyracksDataException {
+        failed = true;
+        notifyAll();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("close(" + pid + " by " + taId);
+        }
+        boolean commit = false;
+        synchronized (this) {
+            eos = true;
+            ctx.getIOManager().close(handle);
+            handle = null;
+            commit = !failed;
+            notifyAll();
+        }
+        if (commit) {
+            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
new file mode 100644
index 0000000..45c091a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class PartitionManager {
+    private final NodeControllerService ncs;
+
+    private final Map<PartitionId, List<IPartition>> partitionMap;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    public PartitionManager(NodeControllerService ncs) {
+        this.ncs = ncs;
+        partitionMap = new HashMap<PartitionId, List<IPartition>>();
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+    }
+
+    public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+            throws HyracksDataException {
+        synchronized (this) {
+            List<IPartition> pList = partitionMap.get(pid);
+            if (pList == null) {
+                pList = new ArrayList<IPartition>();
+                partitionMap.put(pid, pList);
+            }
+            pList.add(partition);
+        }
+        updatePartitionState(pid, taId, partition, state);
+    }
+
+    public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+            throws HyracksDataException {
+        PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+        desc.setState(state);
+        try {
+            ncs.getClusterController().registerPartitionProvider(desc);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public synchronized IPartition getPartition(PartitionId pid) {
+        return partitionMap.get(pid).get(0);
+    }
+
+    public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> e = i.next();
+            PartitionId pid = e.getKey();
+            if (jobId.equals(pid.getJobId())) {
+                for (IPartition p : e.getValue()) {
+                    unregisteredPartitions.add(p);
+                }
+                i.remove();
+            }
+        }
+    }
+
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
+            throws HyracksException {
+        List<IPartition> pList = partitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
+            writer.setTaskContext(partition.getTaskContext());
+            partition.writeTo(writer);
+            if (!partition.isReusable()) {
+                partitionMap.remove(partitionId);
+            }
+        } else {
+            throw new HyracksException("Request for unknown partition " + partitionId);
+        }
+    }
+
+    public IWorkspaceFileFactory getFileFactory() {
+        return fileFactory;
+    }
+
+    public void close() {
+        deallocatableRegistry.close();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
new file mode 100644
index 0000000..e427cf3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+
+public class PipelinedPartition implements IFrameWriter, IPartition {
+    private final IHyracksTaskContext ctx;
+
+    private final PartitionManager manager;
+
+    private final PartitionId pid;
+
+    private final TaskAttemptId taId;
+
+    private IFrameWriter delegate;
+
+    private boolean pendingConnection;
+
+    private boolean failed;
+
+    public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+        this.ctx = ctx;
+        this.manager = manager;
+        this.pid = pid;
+        this.taId = taId;
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        return ctx;
+    }
+
+    @Override
+    public boolean isReusable() {
+        return false;
+    }
+
+    @Override
+    public void deallocate() {
+        // do nothing
+    }
+
+    @Override
+    public synchronized void writeTo(IFrameWriter writer) {
+        delegate = writer;
+        notifyAll();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+        failed = false;
+        pendingConnection = true;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        ensureConnected();
+        delegate.nextFrame(buffer);
+    }
+
+    private void ensureConnected() throws HyracksDataException {
+        if (pendingConnection) {
+            synchronized (this) {
+                while (delegate == null) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+            delegate.open();
+        }
+        pendingConnection = false;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+        if (delegate != null) {
+            delegate.fail();
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!failed) {
+            ensureConnected();
+            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+            delegate.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
new file mode 100644
index 0000000..b1e58fc
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+    private final IHyracksTaskContext ctx;
+
+    private PartitionManager manager;
+
+    private final IPartitionCollector delegate;
+
+    private final TaskAttemptId taId;
+
+    private final Executor executor;
+
+    public ReceiveSideMaterializingCollector(IHyracksTaskContext ctx, PartitionManager manager,
+            IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
+        this.ctx = ctx;
+        this.manager = manager;
+        this.delegate = collector;
+        this.taId = taId;
+        this.executor = executor;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return delegate.getJobId();
+    }
+
+    @Override
+    public ConnectorDescriptorId getConnectorId() {
+        return delegate.getConnectorId();
+    }
+
+    @Override
+    public int getReceiverIndex() {
+        return delegate.getReceiverIndex();
+    }
+
+    @Override
+    public void open() throws HyracksException {
+        delegate.open();
+    }
+
+    @Override
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+        for (final PartitionChannel pc : partitions) {
+            PartitionWriter writer = new PartitionWriter(pc);
+            executor.execute(writer);
+        }
+    }
+
+    private class PartitionWriter implements Runnable, IInputChannelMonitor {
+        private PartitionChannel pc;
+
+        private final AtomicInteger nAvailableFrames;
+
+        private final AtomicBoolean eos;
+
+        private final AtomicBoolean failed;
+
+        public PartitionWriter(PartitionChannel pc) {
+            this.pc = pc;
+            nAvailableFrames = new AtomicInteger(0);
+            eos = new AtomicBoolean(false);
+            failed = new AtomicBoolean(false);
+        }
+
+        @Override
+        public synchronized void notifyFailure(IInputChannel channel) {
+            failed.set(true);
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+            nAvailableFrames.addAndGet(nFrames);
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyEndOfStream(IInputChannel channel) {
+            eos.set(true);
+            notifyAll();
+        }
+
+        @Override
+        public void run() {
+            PartitionId pid = pc.getPartitionId();
+            MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, taId, executor);
+            IInputChannel channel = pc.getInputChannel();
+            try {
+                channel.registerMonitor(this);
+                channel.open(ctx);
+                mpw.open();
+                while (true) {
+                    if (nAvailableFrames.get() > 0) {
+                        ByteBuffer buffer = channel.getNextBuffer();
+                        nAvailableFrames.decrementAndGet();
+                        mpw.nextFrame(buffer);
+                        channel.recycleBuffer(buffer);
+                    } else if (eos.get()) {
+                        break;
+                    } else if (failed.get()) {
+                        throw new HyracksDataException("Failure occurred on input");
+                    } else {
+                        try {
+                            synchronized (this) {
+                                if (nAvailableFrames.get() <= 0 && !eos.get() && !failed.get()) {
+                                    wait();
+                                }
+                            }
+                        } catch (InterruptedException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                }
+                mpw.close();
+                channel.close();
+                delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
+                        new MaterializedPartitionInputChannel(1, pid, manager))));
+            } catch (HyracksException e) {
+            }
+        }
+    }
+
+    @Override
+    public IFrameReader getReader() throws HyracksException {
+        return delegate.getReader();
+    }
+
+    @Override
+    public void close() throws HyracksException {
+        delegate.close();
+    }
+
+    @Override
+    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+        return delegate.getRequiredPartitionIds();
+    }
+
+    @Override
+    public void abort() {
+        delegate.abort();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
new file mode 100644
index 0000000..96fcf75
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorReceiverProfilingFrameReader implements IFrameReader {
+    private final IFrameReader reader;
+    private final ICounter openCounter;
+    private final ICounter closeCounter;
+    private final ICounter frameCounter;
+
+    public ConnectorReceiverProfilingFrameReader(IHyracksTaskContext ctx, IFrameReader reader,
+            ConnectorDescriptorId cdId, int receiverIndex) {
+        this.reader = reader;
+        this.openCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".open", true);
+        this.closeCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".close", true);
+        this.frameCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".receiver." + receiverIndex + ".nextFrame", true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        reader.open();
+        openCounter.update(1);
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        boolean status = reader.nextFrame(buffer);
+        if (status) {
+            frameCounter.update(1);
+        }
+        return status;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        reader.close();
+        closeCounter.update(1);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
new file mode 100644
index 0000000..be7b319
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
+    private final IFrameWriter writer;
+    private final ICounter openCounter;
+    private final ICounter closeCounter;
+    private final ICounter frameCounter;
+
+    public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
+            ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+        this.writer = writer;
+        int attempt = ctx.getTaskAttemptId().getAttempt();
+        this.openCounter = ctx.getCounterContext().getCounter(
+                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+        this.closeCounter = ctx.getCounterContext().getCounter(
+                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+        this.frameCounter = ctx.getCounterContext().getCounter(
+                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        openCounter.update(1);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        frameCounter.update(1);
+        writer.nextFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        closeCounter.update(1);
+        writer.close();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
new file mode 100644
index 0000000..affa01c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+    private static final int N_SAMPLES = 64;
+
+    private final IHyracksTaskContext ctx;
+
+    private final IConnectorDescriptor cd;
+
+    private final int senderIndex;
+
+    private final IPartitionWriterFactory delegate;
+
+    public ProfilingPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorDescriptor cd, int senderIndex,
+            IPartitionWriterFactory delegate) {
+        this.ctx = ctx;
+        this.cd = cd;
+        this.senderIndex = senderIndex;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public IFrameWriter createFrameWriter(final int receiverIndex) throws HyracksDataException {
+        final IFrameWriter writer = new ConnectorSenderProfilingFrameWriter(ctx,
+                delegate.createFrameWriter(receiverIndex), cd.getConnectorId(), senderIndex, receiverIndex);
+        return new IFrameWriter() {
+            private long openTime;
+
+            private long closeTime;
+
+            MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
+
+            @Override
+            public void open() throws HyracksDataException {
+                openTime = System.currentTimeMillis();
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                mrep.reportEvent();
+                writer.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                closeTime = System.currentTimeMillis();
+                ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
+                        .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
+                writer.close();
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
new file mode 100644
index 0000000..863ab7b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.resources;
+
+import java.util.List;
+import java.util.Vector;
+
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public class DefaultDeallocatableRegistry implements IDeallocatableRegistry {
+    private final List<IDeallocatable> deallocatables;
+
+    public DefaultDeallocatableRegistry() {
+        deallocatables = new Vector<IDeallocatable>();
+    }
+
+    @Override
+    public void registerDeallocatable(IDeallocatable deallocatable) {
+        deallocatables.add(deallocatable);
+    }
+
+    public void close() {
+        for (IDeallocatable d : deallocatables) {
+            try {
+                d.deallocate();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
new file mode 100644
index 0000000..4651149
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class RootHyracksContext implements IHyracksRootContext {
+    private final NodeControllerService ncs;
+
+    private final IIOManager ioManager;
+
+    public RootHyracksContext(NodeControllerService ncs, IIOManager ioManager) {
+        this.ncs = ncs;
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IIOManager getIOManager() {
+        return ioManager;
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return ncs.getNodeControllersInfo();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
new file mode 100644
index 0000000..8f8c032
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class AbortTasksWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final JobId jobId;
+
+    private final List<TaskAttemptId> tasks;
+
+    public AbortTasksWork(NodeControllerService ncs, JobId jobId, List<TaskAttemptId> tasks) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.tasks = tasks;
+    }
+
+    @Override
+    public void run() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
+        }
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+            for (TaskAttemptId taId : tasks) {
+                Task task = taskMap.get(taId);
+                if (task != null) {
+                    task.abort();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..deb1b75
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ * 
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+    private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+    private byte[] message;
+    private String nodeId;
+    private NodeControllerService ncs;
+    private String appName;
+
+    public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+        this.ncs = ncs;
+        this.nodeId = nodeId;
+        this.message = message;
+        this.appName = appName;
+    }
+
+    @Override
+    public void run() {
+
+        NCApplicationContext ctx = ncs.getApplications().get(appName);
+        try {
+            IMessage data = (IMessage) ctx.deserialize(message);
+            if (ctx.getMessageBroker() != null) {
+                ctx.getMessageBroker().receivedMessage(data, nodeId);
+            } else {
+                LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
+            }
+        } catch (IOException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+        } catch (ClassNotFoundException e) {
+            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "nodeID: " + nodeId;
+    }
+
+}
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
new file mode 100644
index 0000000..574bc6d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class BuildJobProfilesWork extends SynchronizableWork {
+    private final NodeControllerService ncs;
+
+    private final FutureValue<List<JobProfile>> fv;
+
+    public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+        this.ncs = ncs;
+        this.fv = fv;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        List<JobProfile> profiles = new ArrayList<JobProfile>();
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        for (Joblet ji : jobletMap.values()) {
+            profiles.add(new JobProfile(ji.getJobId()));
+        }
+        for (JobProfile jProfile : profiles) {
+            Joblet ji;
+            JobletProfile jobletProfile = new JobletProfile(ncs.getId());
+            ji = jobletMap.get(jProfile.getJobId());
+            if (ji != null) {
+                ji.dumpProfile(jobletProfile);
+                jProfile.getJobletProfiles().put(ncs.getId(), jobletProfile);
+            }
+        }
+        fv.setValue(profiles);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
new file mode 100644
index 0000000..173ab92
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class CleanupJobletWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final JobId jobId;
+
+    private JobStatus status;
+
+    public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.status = status;
+    }
+
+    @Override
+    public void run() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Cleaning up after job: " + jobId);
+        }
+        final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
+        ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
+        ncs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                for (IPartition p : unregisteredPartitions) {
+                    p.deallocate();
+                }
+            }
+        });
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet joblet = jobletMap.remove(jobId);
+        if (joblet != null) {
+            joblet.cleanup(status);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
new file mode 100644
index 0000000..6eb1a95
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class CreateApplicationWork extends AbstractWork {
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    private final boolean deployHar;
+
+    private final byte[] serializedDistributedState;
+
+    public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
+            byte[] serializedDistributedState) {
+        this.ncs = ncs;
+        this.appName = appName;
+        this.deployHar = deployHar;
+        this.serializedDistributedState = serializedDistributedState;
+    }
+
+    @Override
+    public void run() {
+        try {
+            NCApplicationContext appCtx;
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            if (applications.containsKey(appName)) {
+                throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+            }
+            appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+            applications.put(appName, appCtx);
+            if (deployHar) {
+                NCConfig ncConfig = ncs.getConfiguration();
+                NodeParameters nodeParameters = ncs.getNodeParameters();
+                HttpClient hc = new DefaultHttpClient();
+                HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+                        + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+                HttpResponse response = hc.execute(get);
+                InputStream is = response.getEntity().getContent();
+                OutputStream os = appCtx.getHarOutputStream();
+                try {
+                    IOUtils.copyLarge(is, os);
+                } finally {
+                    os.close();
+                    is.close();
+                }
+            }
+            appCtx.initializeClassPath();
+            appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+            appCtx.initialize();
+            ncs.getClusterController()
+                    .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
new file mode 100644
index 0000000..b104ce8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class DestroyApplicationWork extends AbstractWork {
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+        this.ncs = ncs;
+        this.appName = appName;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            ApplicationContext appCtx = applications.remove(appName);
+            if (appCtx != null) {
+                appCtx.deinitialize();
+            }
+            ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName,
+                    ApplicationStatus.DEINITIALIZED);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
new file mode 100644
index 0000000..022c92f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskCompleteWork extends AbstractWork {
+    private final NodeControllerService ncs;
+    private final Task task;
+
+    public NotifyTaskCompleteWork(NodeControllerService ncs, Task task) {
+        this.ncs = ncs;
+        this.task = task;
+    }
+
+    @Override
+    public void run() {
+        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+        task.dumpProfile(taskProfile);
+        try {
+            ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+                    ncs.getId(), taskProfile);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        task.getJoblet().removeTask(task);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
new file mode 100644
index 0000000..3957934
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskFailureWork extends AbstractWork {
+    private final NodeControllerService ncs;
+    private final Task task;
+    private final String details;
+
+    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+        this.ncs = ncs;
+        this.task = task;
+        this.details = details;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+                    ncs.getId(), details);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        task.getJoblet().removeTask(task);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
new file mode 100644
index 0000000..bb9669d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+
+public class ReportPartitionAvailabilityWork extends AbstractWork {
+    private final NodeControllerService ncs;
+
+    private final PartitionId pid;
+
+    private final NetworkAddress networkAddress;
+
+    public ReportPartitionAvailabilityWork(NodeControllerService ncs, PartitionId pid, NetworkAddress networkAddress) {
+        this.ncs = ncs;
+        this.pid = pid;
+        this.networkAddress = networkAddress;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+            Joblet ji = jobletMap.get(pid.getJobId());
+            if (ji != null) {
+                PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
+                        new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+                                networkAddress.getPort()), pid, 5));
+                ji.reportPartitionAvailability(channel);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
new file mode 100644
index 0000000..0c0fa3d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
+
+public class StartTasksWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
+
+    private final NodeControllerService ncs;
+
+    private final String appName;
+
+    private final JobId jobId;
+
+    private final byte[] acgBytes;
+
+    private final List<TaskAttemptDescriptor> taskDescriptors;
+
+    private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
+
+    private final EnumSet<JobFlag> flags;
+
+    public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] acgBytes,
+            List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
+        this.ncs = ncs;
+        this.appName = appName;
+        this.jobId = jobId;
+        this.acgBytes = acgBytes;
+        this.taskDescriptors = taskDescriptors;
+        this.connectorPoliciesMap = connectorPoliciesMap;
+        this.flags = flags;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            NCApplicationContext appCtx = applications.get(appName);
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+                    : (ActivityClusterGraph) appCtx.deserialize(acgBytes));
+            final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
+
+            IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+                @Override
+                public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+                    ActivityCluster ac = acg.getActivityMap().get(aid);
+                    IConnectorDescriptor conn = ac.getActivityOutputMap().get(aid).get(outputIndex);
+                    return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+
+                @Override
+                public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+                    ActivityCluster ac = acg.getActivityMap().get(aid);
+                    IConnectorDescriptor conn = ac.getActivityInputMap().get(aid).get(inputIndex);
+                    return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+            };
+
+            for (TaskAttemptDescriptor td : taskDescriptors) {
+                TaskAttemptId taId = td.getTaskAttemptId();
+                TaskId tid = taId.getTaskId();
+                ActivityId aid = tid.getActivityId();
+                ActivityCluster ac = acg.getActivityMap().get(aid);
+                IActivity han = ac.getActivityMap().get(aid);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Initializing " + taId + " -> " + han);
+                }
+                final int partition = tid.getPartition();
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
+                IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
+
+                List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
+
+                List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+                if (inputs != null) {
+                    for (int i = 0; i < inputs.size(); ++i) {
+                        IConnectorDescriptor conn = inputs.get(i);
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("input: " + i + ": " + conn.getConnectorId());
+                        }
+                        RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+                                recordDesc, cPolicy);
+                        collectors.add(collector);
+                    }
+                }
+                List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
+                if (outputs != null) {
+                    for (int i = 0; i < outputs.size(); ++i) {
+                        final IConnectorDescriptor conn = outputs.get(i);
+                        RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
+                                partition, taId, flags);
+
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+                        }
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        operator.setOutputFrameWriter(i, writer, recordDesc);
+                    }
+                }
+
+                task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
+                joblet.addTask(task);
+
+                task.start();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
+            throws Exception {
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        Joblet ji = jobletMap.get(jobId);
+        if (ji == null) {
+            if (acg == null) {
+                throw new NullPointerException("JobActivityGraph was null");
+            }
+            ji = new Joblet(ncs, jobId, appCtx, acg);
+            jobletMap.put(jobId, ji);
+        }
+        return ji;
+    }
+
+    private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+            int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+            throws HyracksDataException {
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
+        if (cPolicy.materializeOnReceiveSide()) {
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+                    task.getTaskAttemptId(), ncs.getExecutor());
+        } else {
+            return collector;
+        }
+    }
+
+    private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext ctx,
+            IConnectorPolicy cPolicy, final JobId jobId, final IConnectorDescriptor conn, final int senderIndex,
+            final TaskAttemptId taId, EnumSet<JobFlag> flags) {
+        IPartitionWriterFactory factory;
+        if (cPolicy.materializeOnSendSide()) {
+            if (cPolicy.consumerWaitsForProducerToFinish()) {
+                factory = new IPartitionWriterFactory() {
+                    @Override
+                    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                    }
+                };
+            } else {
+                factory = new IPartitionWriterFactory() {
+                    @Override
+                    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
+                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                    }
+                };
+            }
+        } else {
+            factory = new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+                            conn.getConnectorId(), senderIndex, receiverIndex), taId);
+                }
+            };
+        }
+        if (flags.contains(JobFlag.PROFILE_RUNTIME)) {
+            factory = new ProfilingPartitionWriterFactory(ctx, conn, senderIndex, factory);
+        }
+        return factory;
+    }
+}
\ No newline at end of file