Copied hyracks trunk into fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1958 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
new file mode 100644
index 0000000..8e20020
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-control-common</artifactId>
+  <version>0.2.2-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-control</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-api</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>commons-io</groupId>
+  		<artifactId>commons-io</artifactId>
+  		<version>1.4</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
new file mode 100644
index 0000000..21e3e9d
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common;
+
+import edu.uci.ics.hyracks.control.common.service.IService;
+
+public abstract class AbstractRemoteService implements IService {
+    public AbstractRemoteService() {
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
new file mode 100644
index 0000000..fc755a5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.application;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Properties;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import edu.uci.ics.hyracks.api.application.IApplicationContext;
+import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public abstract class ApplicationContext implements IApplicationContext {
+    private static final String APPLICATION_ROOT = "applications";
+    private static final String CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY = "cc.bootstrap.class";
+    private static final String NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY = "nc.bootstrap.class";
+
+    protected ServerContext serverCtx;
+    protected final String appName;
+    protected final File applicationRootDir;
+    protected ClassLoader classLoader;
+    protected ApplicationStatus status;
+    protected Properties deploymentDescriptor;
+    protected IBootstrap bootstrap;
+    protected Serializable distributedState;
+    protected IMessageBroker messageBroker;
+
+    public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+        this.serverCtx = serverCtx;
+        this.appName = appName;
+        this.applicationRootDir = new File(new File(serverCtx.getBaseDir(), APPLICATION_ROOT), appName);
+        FileUtils.deleteDirectory(applicationRootDir);
+        applicationRootDir.mkdirs();
+    }
+
+    public String getApplicationName() {
+        return appName;
+    }
+
+    public void initializeClassPath() throws Exception {
+        if (expandArchive()) {
+            File expandedFolder = getExpandedFolder();
+            List<URL> urls = new ArrayList<URL>();
+            findJarFiles(expandedFolder, urls);
+            Collections.sort(urls, new Comparator<URL>() {
+                @Override
+                public int compare(URL o1, URL o2) {
+                    return o1.getFile().compareTo(o2.getFile());
+                }
+            });
+            classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
+        } else {
+            classLoader = getClass().getClassLoader();
+        }
+        deploymentDescriptor = parseDeploymentDescriptor();
+    }
+
+    public void initialize() throws Exception {
+        if (deploymentDescriptor != null) {
+            String bootstrapClass = null;
+            switch (serverCtx.getServerType()) {
+                case CLUSTER_CONTROLLER: {
+                    bootstrapClass = deploymentDescriptor.getProperty(CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY);
+                    break;
+                }
+                case NODE_CONTROLLER: {
+                    bootstrapClass = deploymentDescriptor.getProperty(NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY);
+                    break;
+                }
+            }
+            if (bootstrapClass != null) {
+                bootstrap = (IBootstrap) classLoader.loadClass(bootstrapClass).newInstance();
+                start();
+            }
+        }
+    }
+
+    protected abstract void start() throws Exception;
+
+    protected abstract void stop() throws Exception;
+
+    private void findJarFiles(File dir, List<URL> urls) throws MalformedURLException {
+        for (File f : dir.listFiles()) {
+            if (f.isDirectory()) {
+                findJarFiles(f, urls);
+            } else if (f.getName().endsWith(".jar") || f.getName().endsWith(".zip")) {
+                urls.add(f.toURI().toURL());
+            }
+        }
+    }
+
+    private Properties parseDeploymentDescriptor() throws IOException {
+        InputStream in = classLoader.getResourceAsStream("hyracks-deployment.properties");
+        Properties props = new Properties();
+        if (in != null) {
+            try {
+                props.load(in);
+            } finally {
+                in.close();
+            }
+        }
+        return props;
+    }
+
+    private boolean expandArchive() throws IOException {
+        File archiveFile = getArchiveFile();
+        if (archiveFile.exists()) {
+            File expandedFolder = getExpandedFolder();
+            FileUtils.deleteDirectory(expandedFolder);
+            ZipFile zf = new ZipFile(archiveFile);
+            for (Enumeration<? extends ZipEntry> i = zf.entries(); i.hasMoreElements();) {
+                ZipEntry ze = i.nextElement();
+                String name = ze.getName();
+                if (name.endsWith("/")) {
+                    continue;
+                }
+                InputStream is = zf.getInputStream(ze);
+                OutputStream os = FileUtils.openOutputStream(new File(expandedFolder, name));
+                try {
+                    IOUtils.copyLarge(is, os);
+                } finally {
+                    os.close();
+                    is.close();
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private File getExpandedFolder() {
+        return new File(applicationRootDir, "expanded");
+    }
+
+    public void deinitialize() throws Exception {
+        stop();
+        File expandedFolder = getExpandedFolder();
+        FileUtils.deleteDirectory(expandedFolder);
+    }
+
+    public Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+        return JavaSerializationUtils.deserialize(bytes, classLoader);
+    }
+
+    public OutputStream getHarOutputStream() throws IOException {
+        return new FileOutputStream(getArchiveFile());
+    }
+
+    private File getArchiveFile() {
+        return new File(applicationRootDir, "application.har");
+    }
+
+    public InputStream getHarInputStream() throws IOException {
+        return new FileInputStream(getArchiveFile());
+    }
+
+    public boolean containsHar() {
+        return getArchiveFile().exists();
+    }
+
+    @Override
+    public Serializable getDistributedState() {
+        return distributedState;
+    }
+
+    @Override
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    public void setStatus(ApplicationStatus status) {
+        this.status = status;
+    }
+
+    public ApplicationStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public void setMessageBroker(IMessageBroker messageBroker) {
+        this.messageBroker = messageBroker;
+    }
+
+    @Override
+    public IMessageBroker getMessageBroker() {
+        return this.messageBroker;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
new file mode 100644
index 0000000..b45c4f1
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.application;
+
+public enum ApplicationStatus {
+    CREATED,
+    IN_INITIALIZATION,
+    INITIALIZED,
+    IN_DEINITIALIZATION,
+    DEINITIALIZED
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
new file mode 100644
index 0000000..a25250a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.base;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public interface IClusterController {
+    public void registerNode(NodeRegistration reg) throws Exception;
+
+    public void unregisterNode(String nodeId) throws Exception;
+
+    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+            throws Exception;
+
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
+
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+
+    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+
+    public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
+
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
+
+    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+
+    public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
new file mode 100644
index 0000000..049adf8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.base;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public interface INodeController {
+    public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
+
+    public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
+
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
+
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception;
+
+    public void destroyApplication(String appName) throws Exception;
+
+    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java
new file mode 100644
index 0000000..7eca794
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.context;
+
+import java.io.File;
+
+public class ServerContext {
+    public enum ServerType {
+        CLUSTER_CONTROLLER,
+        NODE_CONTROLLER,
+    }
+
+    private final ServerType type;
+    private final File baseDir;
+
+    public ServerContext(ServerType type, File baseDir) throws Exception {
+        this.type = type;
+        this.baseDir = baseDir;
+    }
+
+    public ServerType getServerType() {
+        return type;
+    }
+
+    public File getBaseDir() {
+        return baseDir;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
new file mode 100644
index 0000000..6c208fe
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.File;
+import java.util.List;
+
+import org.kohsuke.args4j.Option;
+
+public class CCConfig {
+    @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients", required = true)
+    public String clientNetIpAddress;
+
+    @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
+    public int clientNetPort = 1098;
+
+    @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from ", required = true)
+    public String clusterNetIpAddress;
+
+    @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+    public int clusterNetPort = 1099;
+
+    @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
+    public int httpPort = 16001;
+
+    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+    public int heartbeatPeriod = 10000;
+
+    @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+    public int maxHeartbeatLapsePeriods = 5;
+
+    @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
+    public int profileDumpPeriod = 0;
+
+    @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
+    public int defaultMaxJobAttempts = 5;
+
+    @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
+    public int jobHistorySize = 10;
+
+    @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
+    public String ccRoot = "ClusterControllerService";
+
+    @Option(name = "-cluster-topology", usage = "Sets the XML file that defines the cluster topology. (default: null)")
+    public File clusterTopologyDefinition = null;
+
+    public void toCommandLine(List<String> cList) {
+        cList.add("-client-net-ip-address");
+        cList.add(clientNetIpAddress);
+        cList.add("-client-net-port");
+        cList.add(String.valueOf(clientNetPort));
+        cList.add("-cluster-net-ip-address");
+        cList.add(clusterNetIpAddress);
+        cList.add("-cluster-net-port");
+        cList.add(String.valueOf(clusterNetPort));
+        cList.add("-http-port");
+        cList.add(String.valueOf(httpPort));
+        cList.add("-heartbeat-period");
+        cList.add(String.valueOf(heartbeatPeriod));
+        cList.add("-max-heartbeat-lapse-periods");
+        cList.add(String.valueOf(maxHeartbeatLapsePeriods));
+        cList.add("-profile-dump-period");
+        cList.add(String.valueOf(profileDumpPeriod));
+        cList.add("-default-max-job-attempts");
+        cList.add(String.valueOf(defaultMaxJobAttempts));
+        cList.add("-job-history-size");
+        cList.add(String.valueOf(jobHistorySize));
+        cList.add("-cc-root");
+        cList.add(ccRoot);
+        if (clusterTopologyDefinition != null) {
+            cList.add("-cluster-topology");
+            cList.add(clusterTopologyDefinition.getAbsolutePath());
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
new file mode 100644
index 0000000..167eb4b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.kohsuke.args4j.Option;
+
+public class NCConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+    public String ccHost;
+
+    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+    public int ccPort = 1099;
+
+    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+    public String clusterNetIPAddress;
+
+    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+    public String nodeId;
+
+    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+    public String dataIPAddress;
+
+    @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+    public String ioDevices = System.getProperty("java.io.tmpdir");
+
+    @Option(name = "-dcache-client-servers", usage = "Sets the list of DCache servers in the format host1:port1,host2:port2,... (default localhost:54583)")
+    public String dcacheClientServers = "localhost:54583";
+
+    @Option(name = "-dcache-client-server-local", usage = "Sets the local DCache server, if one is available, in the format host:port (default not set)")
+    public String dcacheClientServerLocal;
+
+    @Option(name = "-dcache-client-path", usage = "Sets the path to store the files retrieved from the DCache server (default /tmp/dcache-client)")
+    public String dcacheClientPath = "/tmp/dcache-client";
+
+    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+    public int nNetThreads = 1;
+
+    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+    public int maxMemory = -1;
+
+    public void toCommandLine(List<String> cList) {
+        cList.add("-cc-host");
+        cList.add(ccHost);
+        cList.add("-cc-port");
+        cList.add(String.valueOf(ccPort));
+        cList.add("-cluster-net-ip-address");
+        cList.add(clusterNetIPAddress);
+        cList.add("-node-id");
+        cList.add(nodeId);
+        cList.add("-data-ip-address");
+        cList.add(dataIPAddress);
+        cList.add("-iodevices");
+        cList.add(ioDevices);
+        cList.add("-dcache-client-servers");
+        cList.add(dcacheClientServers);
+        if (dcacheClientServerLocal != null) {
+            cList.add("-dcache-client-server-local");
+            cList.add(dcacheClientServerLocal);
+        }
+        cList.add("-dcache-client-path");
+        cList.add(dcacheClientPath);
+        cList.add("-net-thread-count");
+        cList.add(String.valueOf(nNetThreads));
+        cList.add("-max-memory");
+        cList.add(String.valueOf(maxMemory));
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
new file mode 100644
index 0000000..0161f96
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+
+public class NodeParameters implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private ClusterControllerInfo ccInfo;
+
+    private int heartbeatPeriod;
+
+    private int profileDumpPeriod;
+
+    public ClusterControllerInfo getClusterControllerInfo() {
+        return ccInfo;
+    }
+
+    public void setClusterControllerInfo(ClusterControllerInfo ccInfo) {
+        this.ccInfo = ccInfo;
+    }
+
+    public int getHeartbeatPeriod() {
+        return heartbeatPeriod;
+    }
+
+    public void setHeartbeatPeriod(int heartbeatPeriod) {
+        this.heartbeatPeriod = heartbeatPeriod;
+    }
+
+    public int getProfileDumpPeriod() {
+        return profileDumpPeriod;
+    }
+
+    public void setProfileDumpPeriod(int profileDumpPeriod) {
+        this.profileDumpPeriod = profileDumpPeriod;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
new file mode 100644
index 0000000..91cfecf
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+
+public final class NodeRegistration implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final InetSocketAddress ncAddress;
+
+    private final String nodeId;
+
+    private final NCConfig ncConfig;
+
+    private final NetworkAddress dataPort;
+
+    private final String osName;
+
+    private final String arch;
+
+    private final String osVersion;
+
+    private final int nProcessors;
+
+    private final String vmName;
+
+    private final String vmVersion;
+
+    private final String vmVendor;
+
+    private final String classpath;
+
+    private final String libraryPath;
+
+    private final String bootClasspath;
+
+    private final List<String> inputArguments;
+
+    private final Map<String, String> systemProperties;
+
+    private final HeartbeatSchema hbSchema;
+
+    public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
+            String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
+            String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
+            Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+        this.ncAddress = ncAddress;
+        this.nodeId = nodeId;
+        this.ncConfig = ncConfig;
+        this.dataPort = dataPort;
+        this.osName = osName;
+        this.arch = arch;
+        this.osVersion = osVersion;
+        this.nProcessors = nProcessors;
+        this.vmName = vmName;
+        this.vmVersion = vmVersion;
+        this.vmVendor = vmVendor;
+        this.classpath = classpath;
+        this.libraryPath = libraryPath;
+        this.bootClasspath = bootClasspath;
+        this.inputArguments = inputArguments;
+        this.systemProperties = systemProperties;
+        this.hbSchema = hbSchema;
+    }
+
+    public InetSocketAddress getNodeControllerAddress() {
+        return ncAddress;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
+    public NetworkAddress getDataPort() {
+        return dataPort;
+    }
+
+    public String getOSName() {
+        return osName;
+    }
+
+    public String getArch() {
+        return arch;
+    }
+
+    public String getOSVersion() {
+        return osVersion;
+    }
+
+    public int getNProcessors() {
+        return nProcessors;
+    }
+
+    public HeartbeatSchema getHeartbeatSchema() {
+        return hbSchema;
+    }
+
+    public String getVmName() {
+        return vmName;
+    }
+
+    public String getVmVersion() {
+        return vmVersion;
+    }
+
+    public String getVmVendor() {
+        return vmVendor;
+    }
+
+    public String getClasspath() {
+        return classpath;
+    }
+
+    public String getLibraryPath() {
+        return libraryPath;
+    }
+
+    public String getBootClasspath() {
+        return bootClasspath;
+    }
+
+    public List<String> getInputArguments() {
+        return inputArguments;
+    }
+
+    public Map<String, String> getSystemProperties() {
+        return systemProperties;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
new file mode 100644
index 0000000..1dba3bc
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.heartbeat;
+
+import java.io.Serializable;
+
+public class HeartbeatData implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public long heapInitSize;
+    public long heapUsedSize;
+    public long heapCommittedSize;
+    public long heapMaxSize;
+    public long nonheapInitSize;
+    public long nonheapUsedSize;
+    public long nonheapCommittedSize;
+    public long nonheapMaxSize;
+    public int threadCount;
+    public int peakThreadCount;
+    public long totalStartedThreadCount;
+    public double systemLoadAverage;
+    public long[] gcCollectionCounts;
+    public long[] gcCollectionTimes;
+    public long netPayloadBytesRead;
+    public long netPayloadBytesWritten;
+    public long netSignalingBytesRead;
+    public long netSignalingBytesWritten;
+    public long ipcMessagesSent;
+    public long ipcMessageBytesSent;
+    public long ipcMessagesReceived;
+    public long ipcMessageBytesReceived;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java
new file mode 100644
index 0000000..e53e9a8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.heartbeat;
+
+import java.io.Serializable;
+
+public class HeartbeatSchema implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final GarbageCollectorInfo[] gcInfos;
+
+    public HeartbeatSchema(GarbageCollectorInfo[] gcInfos) {
+        this.gcInfos = gcInfos;
+    }
+
+    public GarbageCollectorInfo[] getGarbageCollectorInfos() {
+        return gcInfos;
+    }
+
+    public static class GarbageCollectorInfo implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+
+        public GarbageCollectorInfo(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..8f0056f
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,845 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+    private static final Logger LOGGER = Logger.getLogger(CCNCFunctions.class.getName());
+
+    private static final int FID_CODE_SIZE = 1;
+
+    public enum FunctionId {
+        REGISTER_NODE,
+        UNREGISTER_NODE,
+        NOTIFY_JOBLET_CLEANUP,
+        NOTIFY_TASK_COMPLETE,
+        NOTIFY_TASK_FAILURE,
+        NODE_HEARTBEAT,
+        REPORT_PROFILE,
+        REGISTER_PARTITION_PROVIDER,
+        REGISTER_PARTITION_REQUEST,
+        APPLICATION_STATE_CHANGE_RESPONSE,
+
+        NODE_REGISTRATION_RESULT,
+        START_TASKS,
+        ABORT_TASKS,
+        CLEANUP_JOBLET,
+        CREATE_APPLICATION,
+        DESTROY_APPLICATION,
+        REPORT_PARTITION_AVAILABILITY,
+        SEND_APPLICATION_MESSAGE,
+
+        OTHER
+    }
+
+    public static class SendApplicationMessageFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private byte[] serializedMessage;
+        private String nodeId;
+        private String appName;
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public void setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public byte[] getMessage() {
+            return serializedMessage;
+        }
+
+        public SendApplicationMessageFunction(byte[] data, String appName, String nodeId) {
+            super();
+            this.serializedMessage = data;
+            this.nodeId = nodeId;
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SEND_APPLICATION_MESSAGE;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class RegisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeRegistration reg;
+
+        public RegisterNodeFunction(NodeRegistration reg) {
+            this.reg = reg;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_NODE;
+        }
+
+        public NodeRegistration getNodeRegistration() {
+            return reg;
+        }
+    }
+
+    public static class UnregisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public UnregisterNodeFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNREGISTER_NODE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NotifyTaskCompleteFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final TaskProfile statistics;
+
+        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.statistics = statistics;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_COMPLETE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public TaskProfile getStatistics() {
+            return statistics;
+        }
+    }
+
+    public static class NotifyTaskFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final String details;
+
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.details = details;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+    }
+
+    public static class NotifyJobletCleanupFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_JOBLET_CLEANUP;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NodeHeartbeatFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final HeartbeatData hbData;
+
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+            this.nodeId = nodeId;
+            this.hbData = hbData;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public HeartbeatData getHeartbeatData() {
+            return hbData;
+        }
+    }
+
+    public static class ReportProfileFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final List<JobProfile> profiles;
+
+        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+            this.nodeId = nodeId;
+            this.profiles = profiles;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PROFILE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public List<JobProfile> getProfiles() {
+            return profiles;
+        }
+    }
+
+    public static class RegisterPartitionProviderFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionDescriptor partitionDescriptor;
+
+        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+            this.partitionDescriptor = partitionDescriptor;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_PROVIDER;
+        }
+
+        public PartitionDescriptor getPartitionDescriptor() {
+            return partitionDescriptor;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read reusable flag
+            boolean reusable = dis.readBoolean();
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+            pd.setState(state);
+            return new RegisterPartitionProviderFunction(pd);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+            // Write PartitionId
+            writePartitionId(dos, pd.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pd.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+            // Write reusable flag
+            dos.writeBoolean(pd.isReusable());
+
+            // Write Partition State
+            writePartitionState(dos, pd.getState());
+        }
+    }
+
+    public static class RegisterPartitionRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionRequest partitionRequest;
+
+        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+            this.partitionRequest = partitionRequest;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_REQUEST;
+        }
+
+        public PartitionRequest getPartitionRequest() {
+            return partitionRequest;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+            return new RegisterPartitionRequestFunction(pr);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionRequest pr = fn.getPartitionRequest();
+
+            // Write PartitionId
+            writePartitionId(dos, pr.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pr.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+            // Write Partition State
+            writePartitionState(dos, pr.getMinimumState());
+        }
+    }
+
+    public static class ApplicationStateChangeResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final String appName;
+        private final ApplicationStatus status;
+
+        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+            this.nodeId = nodeId;
+            this.appName = appName;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getApplicationName() {
+            return appName;
+        }
+
+        public ApplicationStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class NodeRegistrationResult extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeParameters params;
+
+        private final Exception exception;
+
+        public NodeRegistrationResult(NodeParameters params, Exception exception) {
+            this.params = params;
+            this.exception = exception;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_REGISTRATION_RESULT;
+        }
+
+        public NodeParameters getNodeParameters() {
+            return params;
+        }
+
+        public Exception getException() {
+            return exception;
+        }
+    }
+
+    public static class StartTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final JobId jobId;
+        private final byte[] planBytes;
+        private final List<TaskAttemptDescriptor> taskDescriptors;
+        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+        private final EnumSet<JobFlag> flags;
+
+        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
+            this.appName = appName;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+            this.taskDescriptors = taskDescriptors;
+            this.connectorPolicies = connectorPolicies;
+            this.flags = flags;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_TASKS;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
+        }
+
+        public List<TaskAttemptDescriptor> getTaskDescriptors() {
+            return taskDescriptors;
+        }
+
+        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+            return connectorPolicies;
+        }
+
+        public EnumSet<JobFlag> getFlags() {
+            return flags;
+        }
+    }
+
+    public static class AbortTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final List<TaskAttemptId> tasks;
+
+        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+            this.jobId = jobId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_TASKS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public List<TaskAttemptId> getTasks() {
+            return tasks;
+        }
+    }
+
+    public static class CleanupJobletFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final JobStatus status;
+
+        public CleanupJobletFunction(JobId jobId, JobStatus status) {
+            this.jobId = jobId;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLEANUP_JOBLET;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public JobStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final boolean deployHar;
+        private final byte[] serializedDistributedState;
+
+        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+            this.appName = appName;
+            this.deployHar = deployHar;
+            this.serializedDistributedState = serializedDistributedState;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public boolean isDeployHar() {
+            return deployHar;
+        }
+
+        public byte[] getSerializedDistributedState() {
+            return serializedDistributedState;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class ReportPartitionAvailabilityFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionId pid;
+        private final NetworkAddress networkAddress;
+
+        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+            this.pid = pid;
+            this.networkAddress = networkAddress;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PARTITION_AVAILABILITY;
+        }
+
+        public PartitionId getPartitionId() {
+            return pid;
+        }
+
+        public NetworkAddress getNetworkAddress() {
+            return networkAddress;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read NetworkAddress
+            NetworkAddress networkAddress = readNetworkAddress(dis);
+
+            return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            // Write PartitionId
+            writePartitionId(dos, fn.getPartitionId());
+
+            // Write NetworkAddress
+            writeNetworkAddress(dos, fn.getNetworkAddress());
+        }
+    }
+
+    public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+        private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+        public SerializerDeserializer() {
+            javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+        }
+
+        @Override
+        public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            return deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            if (fid != FunctionId.OTHER.ordinal()) {
+                throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+            }
+            return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public byte[] serializeObject(Object object) throws Exception {
+            if (object instanceof Function) {
+                Function fn = (Function) object;
+                return serialize(object, (byte) fn.getFunctionId().ordinal());
+            } else {
+                return serialize(object, (byte) FunctionId.OTHER.ordinal());
+            }
+        }
+
+        @Override
+        public byte[] serializeException(Exception object) throws Exception {
+            return serialize(object, (byte) FunctionId.OTHER.ordinal());
+        }
+
+        private byte[] serialize(Object object, byte fid) throws Exception {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            baos.write(fid);
+            try {
+                serialize(baos, object, fid);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Error serializing " + object, e);
+                throw e;
+            }
+            baos.close();
+            return baos.toByteArray();
+        }
+
+        private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    RegisterPartitionProviderFunction.serialize(out, object);
+                    return;
+
+                case REGISTER_PARTITION_REQUEST:
+                    RegisterPartitionRequestFunction.serialize(out, object);
+                    return;
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    ReportPartitionAvailabilityFunction.serialize(out, object);
+                    return;
+            }
+            JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+        }
+
+        private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+                case REGISTER_PARTITION_REQUEST:
+                    return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+            }
+
+            return javaSerde.deserializeObject(buffer, length);
+        }
+    }
+
+    private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+        long jobId = dis.readLong();
+        int cdid = dis.readInt();
+        int senderIndex = dis.readInt();
+        int receiverIndex = dis.readInt();
+        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        return pid;
+    }
+
+    private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+        dos.writeLong(pid.getJobId().getId());
+        dos.writeInt(pid.getConnectorDescriptorId().getId());
+        dos.writeInt(pid.getSenderIndex());
+        dos.writeInt(pid.getReceiverIndex());
+    }
+
+    private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+        int odid = dis.readInt();
+        int aid = dis.readInt();
+        int partition = dis.readInt();
+        int attempt = dis.readInt();
+        TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+                partition), attempt);
+        return taId;
+    }
+
+    private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+        TaskId tid = taId.getTaskId();
+        ActivityId aid = tid.getActivityId();
+        OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+        dos.writeInt(odId.getId());
+        dos.writeInt(aid.getLocalId());
+        dos.writeInt(tid.getPartition());
+        dos.writeInt(taId.getAttempt());
+    }
+
+    private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+        PartitionState state = PartitionState.values()[dis.readInt()];
+        return state;
+    }
+
+    private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+        dos.writeInt(state.ordinal());
+    }
+
+    private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+        int bLen = dis.readInt();
+        byte[] ipAddress = new byte[bLen];
+        dis.read(ipAddress);
+        int port = dis.readInt();
+        NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+        return networkAddress;
+    }
+
+    private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+        byte[] ipAddress = networkAddress.getIpAddress();
+        dos.writeInt(ipAddress.length);
+        dos.write(ipAddress);
+        dos.writeInt(networkAddress.getPort());
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..d789768
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+    private final IIPCHandle ipcHandle;
+
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public void registerNode(NodeRegistration reg) throws Exception {
+        CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void unregisterNode(String nodeId) throws Exception {
+        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+            throws Exception {
+        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+                nodeId, statistics);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+                details);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+        CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
+                partitionDescriptor);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+        CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
+                partitionRequest);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
+        CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
+                nodeId, appName, status);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..10c0a7c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+
+public class NodeControllerRemoteProxy implements INodeController {
+    private final IIPCHandle ipcHandle;
+
+    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
+        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
+                taskDescriptors, connectorPolicies, flags);
+        ipcHandle.send(-1, stf, null);
+    }
+
+    @Override
+    public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+        CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
+        ipcHandle.send(-1, atf, null);
+    }
+
+    @Override
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
+        ipcHandle.send(-1, cjf, null);
+    }
+
+    @Override
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception {
+        CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(appName, deployHar,
+                serializedDistributedState);
+        ipcHandle.send(-1, caf, null);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(appName);
+        ipcHandle.send(-1, daf, null);
+    }
+
+    @Override
+    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+        CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
+                pid, networkAddress);
+        ipcHandle.send(-1, rpaf, null);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
new file mode 100644
index 0000000..9fc0916
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PartitionId pid;
+
+    private final String nodeId;
+
+    private final TaskAttemptId producingTaskAttemptId;
+
+    private final boolean reusable;
+
+    private PartitionState state;
+
+    public PartitionDescriptor(PartitionId pid, String nodeId, TaskAttemptId producingTaskAttemptId, boolean reusable) {
+        this.pid = pid;
+        this.nodeId = nodeId;
+        this.producingTaskAttemptId = producingTaskAttemptId;
+        this.reusable = reusable;
+    }
+
+    public PartitionId getPartitionId() {
+        return pid;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public TaskAttemptId getProducingTaskAttemptId() {
+        return producingTaskAttemptId;
+    }
+
+    public PartitionState getState() {
+        return state;
+    }
+
+    public void setState(PartitionState state) {
+        this.state = state;
+    }
+
+    public boolean isReusable() {
+        return reusable;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable") + " "
+                + state + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
new file mode 100644
index 0000000..ca34501
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionRequest implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PartitionId pid;
+
+    private final String requestingNodeId;
+
+    private final TaskAttemptId requestingTaskAttemptId;
+
+    private final PartitionState minState;
+
+    public PartitionRequest(PartitionId pid, String requestingNodeId, TaskAttemptId requestingTaskAttemptId,
+            PartitionState minState) {
+        this.pid = pid;
+        this.requestingNodeId = requestingNodeId;
+        this.requestingTaskAttemptId = requestingTaskAttemptId;
+        this.minState = minState;
+    }
+
+    public PartitionId getPartitionId() {
+        return pid;
+    }
+
+    public String getNodeId() {
+        return requestingNodeId;
+    }
+
+    public TaskAttemptId getRequestingTaskAttemptId() {
+        return requestingTaskAttemptId;
+    }
+
+    public PartitionState getMinimumState() {
+        return minState;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + pid + ":" + requestingNodeId + ":" + requestingTaskAttemptId + ":" + minState + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
new file mode 100644
index 0000000..edde0b8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+public enum PartitionState {
+    STARTED,
+    COMMITTED;
+
+    public boolean isAtLeast(PartitionState minState) {
+        switch (this) {
+            case COMMITTED:
+                return true;
+
+            case STARTED:
+                return minState == STARTED;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
new file mode 100644
index 0000000..f6d1f78
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class TaskAttemptDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final TaskAttemptId taId;
+
+    private final int nPartitions;
+
+    private final int[] nInputPartitions;
+
+    private final int[] nOutputPartitions;
+
+    private NetworkAddress[][] inputPartitionLocations;
+
+    public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+        this.taId = taId;
+        this.nPartitions = nPartitions;
+        this.nInputPartitions = nInputPartitions;
+        this.nOutputPartitions = nOutputPartitions;
+    }
+
+    public TaskAttemptId getTaskAttemptId() {
+        return taId;
+    }
+
+    public int getPartitionCount() {
+        return nPartitions;
+    }
+
+    public int[] getInputPartitionCounts() {
+        return nInputPartitions;
+    }
+
+    public int[] getOutputPartitionCounts() {
+        return nOutputPartitions;
+    }
+
+    public void setInputPartitionLocations(NetworkAddress[][] inputPartitionLocations) {
+        this.inputPartitionLocations = inputPartitionLocations;
+    }
+
+    public NetworkAddress[][] getInputPartitionLocations() {
+        return inputPartitionLocations;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskAttemptDescriptor[taId = " + taId + ", nPartitions = " + nPartitions + ", nInputPartitions = "
+                + Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
+                + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
new file mode 100644
index 0000000..8529814
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class Counter implements ICounter {
+    private final String name;
+    private final AtomicLong counter;
+
+    public Counter(String name) {
+        this.name = name;
+        counter = new AtomicLong();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public long update(long delta) {
+        return counter.addAndGet(delta);
+    }
+
+    @Override
+    public long set(long value) {
+        long oldValue = counter.get();
+        counter.set(value);
+        return oldValue;
+    }
+
+    @Override
+    public long get() {
+        return counter.get();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int[] times;
+
+    private long offset;
+
+    private int ptr;
+
+    private int resolution;
+
+    private int eventCounter;
+
+    public MultiResolutionEventProfiler(int nSamples) {
+        times = new int[nSamples];
+        offset = -1;
+        ptr = 0;
+        resolution = 1;
+        eventCounter = 0;
+    }
+
+    public void reportEvent() {
+        ++eventCounter;
+        if (eventCounter % resolution != 0) {
+            return;
+        }
+        if (ptr >= times.length) {
+            compact();
+            return;
+        }
+        eventCounter = 0;
+        long time = System.currentTimeMillis();
+        if (offset < 0) {
+            offset = time;
+        }
+        int value = (int) (time - offset);
+        times[ptr++] = value;
+    }
+
+    private void compact() {
+        for (int i = 1; i < ptr / 2; ++i) {
+            times[i] = times[i * 2];
+        }
+        resolution <<= 1;
+        ptr >>= 1;
+    }
+
+    public int getResolution() {
+        return resolution;
+    }
+
+    public int getCount() {
+        return ptr;
+    }
+
+    public int[] getSamples() {
+        return times;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
new file mode 100644
index 0000000..b4e3619
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public abstract class AbstractProfile implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    protected final Map<String, Long> counters;
+
+    public AbstractProfile() {
+        counters = new HashMap<String, Long>();
+    }
+
+    public Map<String, Long> getCounters() {
+        return counters;
+    }
+
+    public abstract JSONObject toJSON() throws JSONException;
+
+    protected void populateCounters(JSONObject jo) throws JSONException {
+        JSONArray countersObj = new JSONArray();
+        for (Map.Entry<String, Long> e : counters.entrySet()) {
+            JSONObject jpe = new JSONObject();
+            jpe.put("name", e.getKey());
+            jpe.put("value", e.getValue());
+            countersObj.put(jpe);
+        }
+        jo.put("counters", countersObj);
+    }
+
+    protected void merge(AbstractProfile profile) {
+        counters.putAll(profile.counters);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
new file mode 100644
index 0000000..46a964a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class JobProfile extends AbstractProfile {
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+
+    private final Map<String, JobletProfile> jobletProfiles;
+
+    public JobProfile(JobId jobId) {
+        this.jobId = jobId;
+        jobletProfiles = new HashMap<String, JobletProfile>();
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public Map<String, JobletProfile> getJobletProfiles() {
+        return jobletProfiles;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject json = new JSONObject();
+
+        json.put("job-id", jobId.toString());
+        populateCounters(json);
+        JSONArray jobletsArray = new JSONArray();
+        for (JobletProfile p : jobletProfiles.values()) {
+            jobletsArray.put(p.toJSON());
+        }
+        json.put("joblets", jobletsArray);
+
+        return json;
+    }
+
+    public void merge(JobProfile other) {
+        super.merge(this);
+        for (JobletProfile jp : other.jobletProfiles.values()) {
+            if (jobletProfiles.containsKey(jp.getNodeId())) {
+                jobletProfiles.get(jp.getNodeId()).merge(jp);
+            } else {
+                jobletProfiles.put(jp.getNodeId(), jp);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
new file mode 100644
index 0000000..0c60006
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class JobletProfile extends AbstractProfile {
+    private static final long serialVersionUID = 1L;
+
+    private final String nodeId;
+
+    private final Map<TaskAttemptId, TaskProfile> taskProfiles;
+
+    public JobletProfile(String nodeId) {
+        this.nodeId = nodeId;
+        taskProfiles = new HashMap<TaskAttemptId, TaskProfile>();
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Map<TaskAttemptId, TaskProfile> getTaskProfiles() {
+        return taskProfiles;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject json = new JSONObject();
+
+        json.put("node-id", nodeId.toString());
+        populateCounters(json);
+        JSONArray tasks = new JSONArray();
+        for (TaskProfile p : taskProfiles.values()) {
+            tasks.put(p.toJSON());
+        }
+        json.put("tasks", tasks);
+
+        return json;
+    }
+
+    public void merge(JobletProfile jp) {
+        super.merge(this);
+        for (TaskProfile tp : jp.taskProfiles.values()) {
+            if (taskProfiles.containsKey(tp.getTaskId())) {
+                taskProfiles.get(tp.getTaskId()).merge(tp);
+            } else {
+                taskProfiles.put(tp.getTaskId(), tp);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
new file mode 100644
index 0000000..ef61796
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
+public class PartitionProfile implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PartitionId pid;
+
+    private final long openTime;
+
+    private final long closeTime;
+
+    private final MultiResolutionEventProfiler mrep;
+
+    public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
+        this.pid = pid;
+        this.openTime = openTime;
+        this.closeTime = closeTime;
+        this.mrep = mrep;
+    }
+
+    public PartitionId getPartitionId() {
+        return pid;
+    }
+
+    public long getOpenTime() {
+        return openTime;
+    }
+
+    public long getCloseTime() {
+        return closeTime;
+    }
+
+    public MultiResolutionEventProfiler getSamples() {
+        return mrep;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
new file mode 100644
index 0000000..0f82844
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
+public class TaskProfile extends AbstractProfile {
+    private static final long serialVersionUID = 1L;
+
+    private final TaskAttemptId taskAttemptId;
+
+    private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+    public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
+        this.taskAttemptId = taskAttemptId;
+        this.partitionSendProfile = new HashMap<PartitionId, PartitionProfile>(partitionSendProfile);
+    }
+
+    public TaskAttemptId getTaskId() {
+        return taskAttemptId;
+    }
+
+    public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+        return partitionSendProfile;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject json = new JSONObject();
+
+        json.put("activity-id", taskAttemptId.getTaskId().getActivityId().toString());
+        json.put("partition", taskAttemptId.getTaskId().getPartition());
+        json.put("attempt", taskAttemptId.getAttempt());
+        if (partitionSendProfile != null) {
+            JSONArray pspArray = new JSONArray();
+            for (PartitionProfile pp : partitionSendProfile.values()) {
+                JSONObject ppObj = new JSONObject();
+                PartitionId pid = pp.getPartitionId();
+                JSONObject pidObj = new JSONObject();
+                pidObj.put("job-id", pid.getJobId());
+                pidObj.put("connector-id", pid.getConnectorDescriptorId());
+                pidObj.put("sender-index", pid.getSenderIndex());
+                pidObj.put("receiver-index", pid.getReceiverIndex());
+                ppObj.put("partition-id", pidObj);
+                ppObj.put("open-time", pp.getOpenTime());
+                ppObj.put("close-time", pp.getCloseTime());
+                MultiResolutionEventProfiler samples = pp.getSamples();
+                ppObj.put("offset", samples.getOffset());
+                int resolution = samples.getResolution();
+                int sampleCount = samples.getCount();
+                JSONArray ftA = new JSONArray();
+                int[] ft = samples.getSamples();
+                for (int i = 0; i < sampleCount; ++i) {
+                    ftA.put(ft[i]);
+                }
+                ppObj.put("frame-times", ftA);
+                ppObj.put("resolution", resolution);
+                pspArray.put(ppObj);
+            }
+            json.put("partition-send-profile", pspArray);
+        }
+        populateCounters(json);
+
+        return json;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java
new file mode 100644
index 0000000..1521aa0
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.logs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+
+import org.json.JSONObject;
+
+public class LogFile {
+    private final File root;
+
+    private PrintWriter out;
+
+    public LogFile(File root) {
+        this.root = root;
+    }
+
+    public void open() throws Exception {
+        root.mkdirs();
+        out = new PrintWriter(new FileOutputStream(new File(root, String.valueOf(System.currentTimeMillis()) + ".log"),
+                true));
+    }
+
+    public void log(JSONObject object) throws Exception {
+        out.println(object.toString(1));
+        out.flush();
+    }
+
+    public void close() {
+        out.flush();
+        out.close();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java
new file mode 100644
index 0000000..4f9cb92
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.service;
+
+public abstract class AbstractService implements IService {
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java
new file mode 100644
index 0000000..4e81066
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.service;
+
+public interface IService {
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
new file mode 100644
index 0000000..4ead100
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+import java.util.logging.Level;
+
+public abstract class AbstractWork implements Runnable {
+    public Level logLevel() {
+        return Level.INFO;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
new file mode 100644
index 0000000..7eb4ff6
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public class FutureValue<T> implements IResultCallback<T> {
+    private boolean done;
+
+    private T value;
+
+    private Exception e;
+
+    public FutureValue() {
+        done = false;
+        value = null;
+        e = null;
+    }
+
+    @Override
+    public synchronized void setValue(T value) {
+        done = true;
+        this.value = value;
+        e = null;
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void setException(Exception e) {
+        done = true;
+        this.e = e;
+        value = null;
+        notifyAll();
+    }
+
+    public synchronized void reset() {
+        done = false;
+        value = null;
+        e = null;
+        notifyAll();
+    }
+
+    public synchronized T get() throws Exception {
+        while (!done) {
+            wait();
+        }
+        if (e != null) {
+            throw e;
+        }
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
new file mode 100644
index 0000000..dcea864
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.common.work;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCResponder<T> implements IResultCallback<T> {
+    private final IIPCHandle handle;
+
+    private final long rmid;
+
+    public IPCResponder(IIPCHandle handle, long rmid) {
+        this.handle = handle;
+        this.rmid = rmid;
+    }
+
+    @Override
+    public void setValue(T result) {
+        try {
+            handle.send(rmid, result, null);
+        } catch (IPCException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setException(Exception e) {
+        try {
+            handle.send(rmid, null, e);
+        } catch (IPCException e1) {
+            e1.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
new file mode 100644
index 0000000..80c3d76
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public interface IResultCallback<T> {
+    public void setValue(T result);
+
+    public void setException(Exception e);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
new file mode 100644
index 0000000..e94086f
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public abstract class SynchronizableWork extends AbstractWork {
+    private boolean done;
+
+    private Exception e;
+
+    protected abstract void doRun() throws Exception;
+
+    public void init() {
+        done = false;
+        e = null;
+    }
+
+    @Override
+    public final void run() {
+        try {
+            doRun();
+        } catch (Exception e) {
+            this.e = e;
+        } finally {
+            synchronized (this) {
+                done = true;
+                notifyAll();
+            }
+        }
+    }
+
+    public final synchronized void sync() throws Exception {
+        while (!done) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw e;
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
new file mode 100644
index 0000000..26e5412
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public class WorkQueue {
+    private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
+
+    private final LinkedBlockingQueue<AbstractWork> queue;
+    private final WorkerThread thread;
+    private final Semaphore stopSemaphore;
+    private boolean stopped;
+    private final AtomicInteger enqueueCount;
+    private final AtomicInteger dequeueCount;
+
+    public WorkQueue() {
+        queue = new LinkedBlockingQueue<AbstractWork>();
+        thread = new WorkerThread();
+        stopSemaphore = new Semaphore(1);
+        enqueueCount = new AtomicInteger();
+        dequeueCount = new AtomicInteger();
+    }
+
+    public void start() throws HyracksException {
+        stopped = false;
+        enqueueCount.set(0);
+        dequeueCount.set(0);
+        try {
+            stopSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new HyracksException(e);
+        }
+        thread.start();
+    }
+
+    public void stop() throws HyracksException {
+        synchronized (this) {
+            stopped = true;
+        }
+        schedule(new AbstractWork() {
+            @Override
+            public void run() {
+            }
+        });
+        try {
+            stopSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    public void schedule(AbstractWork event) {
+        enqueueCount.incrementAndGet();
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Enqueue: " + enqueueCount);
+        }
+        if (LOGGER.isLoggable(Level.FINER)) {
+            LOGGER.finer("Scheduling: " + event);
+        }
+        queue.offer(event);
+    }
+
+    public void scheduleAndSync(SynchronizableWork sRunnable) throws Exception {
+        schedule(sRunnable);
+        sRunnable.sync();
+    }
+
+    private class WorkerThread extends Thread {
+        WorkerThread() {
+            setDaemon(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+                AbstractWork r;
+                while (true) {
+                    synchronized (WorkQueue.this) {
+                        if (stopped) {
+                            return;
+                        }
+                    }
+                    try {
+                        r = queue.take();
+                    } catch (InterruptedException e) {
+                        continue;
+                    }
+                    dequeueCount.incrementAndGet();
+                    if (LOGGER.isLoggable(Level.FINEST)) {
+                        LOGGER.finest("Dequeue: " + dequeueCount + "/" + enqueueCount);
+                    }
+                    try {
+                        if (LOGGER.isLoggable(r.logLevel())) {
+                            LOGGER.log(r.logLevel(), "Executing: " + r);
+                        }
+                        r.run();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            } finally {
+                stopSemaphore.release();
+            }
+        }
+    }
+}
\ No newline at end of file