add missing new files in merge hyracks_dev_next r847:977

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@979 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+    public enum FunctionId {
+        GET_CLUSTER_CONTROLLER_INFO,
+        CREATE_APPLICATION,
+        START_APPLICATION,
+        DESTROY_APPLICATION,
+        CREATE_JOB,
+        GET_JOB_STATUS,
+        START_JOB,
+        WAIT_FOR_COMPLETION,
+        GET_NODE_CONTROLLERS_INFO
+    }
+
+    public abstract static class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class GetClusterControllerInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public CreateApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class StartApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public StartApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class CreateJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final byte[] jobSpec;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+            this.appName = appName;
+            this.jobSpec = jobSpec;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_JOB;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public byte[] getJobSpec() {
+            return jobSpec;
+        }
+
+        public EnumSet<JobFlag> getJobFlags() {
+            return jobFlags;
+        }
+    }
+
+    public static class GetJobStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobStatusFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public StartJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class WaitForCompletionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public WaitForCompletionFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.WAIT_FOR_COMPLETION;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+    private final IIPCHandle ipcHandle;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+                appName);
+        sync.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+                appName);
+        sync.call(ipcHandle, saf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+                appName);
+        sync.call(ipcHandle, daf);
+    }
+
+    @Override
+    public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+                appName, jobSpec, jobFlags);
+        return (JobId) sync.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+                jobId);
+        return (JobStatus) sync.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public void startJob(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+                jobId);
+        sync.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+                jobId);
+        sync.call(ipcHandle, wfcf);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        SyncRMI sync = new SyncRMI();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
new file mode 100644
index 0000000..1b38cb9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ * 
+ * @author vinayakb
+ * 
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
+    private final String ccHost;
+
+    private final IPCSystem ipc;
+
+    private final IHyracksClientInterface hci;
+
+    private final ClusterControllerInfo ccInfo;
+
+    /**
+     * Constructor to create a connection to the Hyracks Cluster Controller.
+     * 
+     * @param ccHost
+     *            Host name (or IP Address) where the Cluster Controller can be
+     *            reached.
+     * @param ccPort
+     *            Port to reach the Hyracks Cluster Controller at the specified
+     *            host name.
+     * @throws Exception
+     */
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
+        this.ccHost = ccHost;
+        ipc = new IPCSystem(new InetSocketAddress(0));
+        ipc.start();
+        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
+        ccInfo = hci.getClusterControllerInfo();
+    }
+
+    @Override
+    public void createApplication(String appName, File harFile) throws Exception {
+        hci.createApplication(appName);
+        if (harFile != null) {
+            HttpClient hc = new DefaultHttpClient();
+            HttpPut put = new HttpPut("http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + appName);
+            put.setEntity(new FileEntity(harFile, "application/octet-stream"));
+            HttpResponse response = hc.execute(put);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                hci.destroyApplication(appName);
+                throw new HyracksException(response.getStatusLine().toString());
+            }
+        }
+        hci.startApplication(appName);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        hci.destroyApplication(appName);
+    }
+
+    @Override
+    public JobId createJob(String appName, JobSpecification jobSpec) throws Exception {
+        return createJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public JobId createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        return hci.createJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hci.getJobStatus(jobId);
+    }
+
+    @Override
+    public void start(JobId jobId) throws Exception {
+        hci.startJob(jobId);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        hci.waitForCompletion(jobId);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return hci.getNodeControllersInfo();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
new file mode 100644
index 0000000..4a6f826
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITypeTraits extends Serializable {
+    public boolean isFixedLength();
+
+    public int getFixedLength();
+}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
new file mode 100644
index 0000000..b3aa406
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.ipc;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
+    private final IHyracksClientInterface hci;
+
+    public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
+        this.hci = hci;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
+        switch (fn.getFunctionId()) {
+            case GET_CLUSTER_CONTROLLER_INFO: {
+                return hci.getClusterControllerInfo();
+            }
+
+            case CREATE_APPLICATION: {
+                HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+                hci.createApplication(caf.getAppName());
+                return null;
+            }
+
+            case START_APPLICATION: {
+                HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+                hci.startApplication(saf.getAppName());
+                return null;
+            }
+
+            case DESTROY_APPLICATION: {
+                HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+                hci.destroyApplication(daf.getAppName());
+                return null;
+            }
+
+            case CREATE_JOB: {
+                HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+                return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
+            }
+
+            case GET_JOB_STATUS: {
+                HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+                return hci.getJobStatus(gjsf.getJobId());
+            }
+
+            case START_JOB: {
+                HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                hci.startJob(sjf.getJobId());
+                return null;
+            }
+
+            case WAIT_FOR_COMPLETION: {
+                HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                hci.waitForCompletion(wfcf.getJobId());
+                return null;
+            }
+
+            case GET_NODE_CONTROLLERS_INFO: {
+                return hci.getNodeControllersInfo();
+            }
+        }
+        throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
new file mode 100644
index 0000000..3c8f7d0
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class ApplicationCreateWork extends AbstractWork {
+    private final ClusterControllerService ccs;
+    private final String appName;
+    private FutureValue<Object> fv;
+
+    public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+        this.ccs = ccs;
+        this.appName = appName;
+        this.fv = fv;
+    }
+
+    @Override
+    public void run() {
+        Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+        if (applications.containsKey(appName)) {
+            fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
+        }
+        CCApplicationContext appCtx;
+        try {
+            appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+        } catch (IOException e) {
+            fv.setException(e);
+            return;
+        }
+        applications.put(appName, appCtx);
+        fv.setValue(null);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
new file mode 100644
index 0000000..e35f962
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class ClusterControllerDelegateIPCI implements IIPCI {
+    private final IClusterController cc;
+
+    public ClusterControllerDelegateIPCI(IClusterController cc) {
+        this.cc = cc;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        ClusterControllerFunctions.Function fn = (Function) req;
+        switch (fn.getFunctionId()) {
+            case REGISTER_NODE: {
+                ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+                return cc.registerNode(rnf.getNodeRegistration());
+            }
+
+            case UNREGISTER_NODE: {
+                ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+                cc.unregisterNode(unf.getNodeId());
+                return null;
+            }
+
+            case NODE_HEARTBEAT: {
+                ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+                cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
+                return null;
+            }
+
+            case NOTIFY_JOBLET_CLEANUP: {
+                ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+                cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
+                return null;
+            }
+
+            case REPORT_PROFILE: {
+                ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+                cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
+                return null;
+            }
+
+            case NOTIFY_TASK_COMPLETE: {
+                ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+                cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
+                return null;
+            }
+            case NOTIFY_TASK_FAILURE: {
+                ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+                cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
+                return null;
+            }
+
+            case REGISTER_PARTITION_PROVIDER: {
+                ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+                cc.registerPartitionProvider(rppf.getPartitionDescriptor());
+                return null;
+            }
+
+            case REGISTER_PARTITION_REQUEST: {
+                ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+                cc.registerPartitionRequest(rprf.getPartitionRequest());
+                return null;
+            }
+        }
+        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
new file mode 100644
index 0000000..4c76357
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public class ClusterControllerFunctions {
+    public enum FunctionId {
+        REGISTER_NODE,
+        UNREGISTER_NODE,
+        NOTIFY_TASK_COMPLETE,
+        NOTIFY_TASK_FAILURE,
+        NOTIFY_JOBLET_CLEANUP,
+        NODE_HEARTBEAT,
+        REPORT_PROFILE,
+        REGISTER_PARTITION_PROVIDER,
+        REGISTER_PARTITION_REQUEST,
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class RegisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeRegistration reg;
+
+        public RegisterNodeFunction(NodeRegistration reg) {
+            this.reg = reg;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_NODE;
+        }
+
+        public NodeRegistration getNodeRegistration() {
+            return reg;
+        }
+    }
+
+    public static class UnregisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public UnregisterNodeFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNREGISTER_NODE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NotifyTaskCompleteFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final TaskProfile statistics;
+
+        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.statistics = statistics;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_COMPLETE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public TaskProfile getStatistics() {
+            return statistics;
+        }
+    }
+
+    public static class NotifyTaskFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final String details;
+
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.details = details;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+    }
+
+    public static class NotifyJobletCleanupFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_JOBLET_CLEANUP;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NodeHeartbeatFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final HeartbeatData hbData;
+
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+            this.nodeId = nodeId;
+            this.hbData = hbData;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public HeartbeatData getHeartbeatData() {
+            return hbData;
+        }
+    }
+
+    public static class ReportProfileFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final List<JobProfile> profiles;
+
+        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+            this.nodeId = nodeId;
+            this.profiles = profiles;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PROFILE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public List<JobProfile> getProfiles() {
+            return profiles;
+        }
+    }
+
+    public static class RegisterPartitionProviderFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionDescriptor partitionDescriptor;
+
+        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+            this.partitionDescriptor = partitionDescriptor;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_PROVIDER;
+        }
+
+        public PartitionDescriptor getPartitionDescriptor() {
+            return partitionDescriptor;
+        }
+    }
+
+    public static class RegisterPartitionRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionRequest partitionRequest;
+
+        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+            this.partitionRequest = partitionRequest;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_REQUEST;
+        }
+
+        public PartitionRequest getPartitionRequest() {
+            return partitionRequest;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..0aeab72
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+    private final IIPCHandle ipcHandle;
+
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public NodeParameters registerNode(NodeRegistration reg) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+        NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
+        return result;
+    }
+
+    @Override
+    public void unregisterNode(String nodeId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+                nodeId);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+            throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+                jobId, taskId, nodeId, statistics);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+                jobId, taskId, nodeId, details);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+                jobId, nodeId);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+                hbData);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+                profiles);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+                partitionDescriptor);
+        sync.call(ipcHandle, fn);
+    }
+
+    @Override
+    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+                partitionRequest);
+        sync.call(ipcHandle, fn);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
new file mode 100644
index 0000000..f3e51ec
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class NodeControllerDelegateIPCI implements IIPCI {
+    private final INodeController nc;
+
+    public NodeControllerDelegateIPCI(INodeController nc) {
+        this.nc = nc;
+    }
+
+    @Override
+    public Object call(IIPCHandle caller, Object req) throws Exception {
+        NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
+        switch (fn.getFunctionId()) {
+            case START_TASKS: {
+                NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+                nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
+                        stf.getConnectorPolicies());
+                return null;
+            }
+
+            case ABORT_TASKS: {
+                NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+                nc.abortTasks(atf.getJobId(), atf.getTasks());
+                return null;
+            }
+
+            case CLEANUP_JOBLET: {
+                NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+                nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
+                return null;
+            }
+
+            case CREATE_APPLICATION: {
+                NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+                nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
+                return null;
+            }
+
+            case DESTROY_APPLICATION: {
+                NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+                nc.destroyApplication(daf.getAppName());
+                return null;
+            }
+
+            case REPORT_PARTITION_AVAILABILITY: {
+                NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+                nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
+                return null;
+            }
+        }
+        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
new file mode 100644
index 0000000..0d39c1b
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public class NodeControllerFunctions {
+    public enum FunctionId {
+        START_TASKS,
+        ABORT_TASKS,
+        CLEANUP_JOBLET,
+        CREATE_APPLICATION,
+        DESTROY_APPLICATION,
+        REPORT_PARTITION_AVAILABILITY
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class StartTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final JobId jobId;
+        private final byte[] planBytes;
+        private final List<TaskAttemptDescriptor> taskDescriptors;
+        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+            this.appName = appName;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+            this.taskDescriptors = taskDescriptors;
+            this.connectorPolicies = connectorPolicies;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_TASKS;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
+        }
+
+        public List<TaskAttemptDescriptor> getTaskDescriptors() {
+            return taskDescriptors;
+        }
+
+        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+            return connectorPolicies;
+        }
+    }
+
+    public static class AbortTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final List<TaskAttemptId> tasks;
+
+        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+            this.jobId = jobId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_TASKS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public List<TaskAttemptId> getTasks() {
+            return tasks;
+        }
+    }
+
+    public static class CleanupJobletFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final JobStatus status;
+
+        public CleanupJobletFunction(JobId jobId, JobStatus status) {
+            this.jobId = jobId;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLEANUP_JOBLET;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public JobStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final boolean deployHar;
+        private final byte[] serializedDistributedState;
+
+        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+            this.appName = appName;
+            this.deployHar = deployHar;
+            this.serializedDistributedState = serializedDistributedState;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public boolean isDeployHar() {
+            return deployHar;
+        }
+
+        public byte[] getSerializedDistributedState() {
+            return serializedDistributedState;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class ReportPartitionAvailabilityFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionId pid;
+        private final NetworkAddress networkAddress;
+
+        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+            this.pid = pid;
+            this.networkAddress = networkAddress;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PARTITION_AVAILABILITY;
+        }
+
+        public PartitionId getPartitionId() {
+            return pid;
+        }
+
+        public NetworkAddress getNetworkAddress() {
+            return networkAddress;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..ccd9468
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class NodeControllerRemoteProxy implements INodeController {
+    private final IIPCHandle ipcHandle;
+
+    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
+    }
+
+    @Override
+    public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+                planBytes, taskDescriptors, connectorPolicies);
+        sync.call(ipcHandle, stf);
+    }
+
+    @Override
+    public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+        sync.call(ipcHandle, atf);
+    }
+
+    @Override
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+                status);
+        sync.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+                appName, deployHar, serializedDistributedState);
+        sync.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        SyncRMI sync = new SyncRMI();
+        NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+                appName);
+        sync.call(ipcHandle, daf);
+    }
+
+    @Override
+    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+        NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+                pid, networkAddress);
+        ipcHandle.send(rpaf, null);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int[] times;
+
+    private long offset;
+
+    private int ptr;
+
+    private int resolution;
+
+    private int eventCounter;
+
+    public MultiResolutionEventProfiler(int nSamples) {
+        times = new int[nSamples];
+        offset = -1;
+        ptr = 0;
+        resolution = 1;
+        eventCounter = 0;
+    }
+
+    public void reportEvent() {
+        ++eventCounter;
+        if (eventCounter % resolution != 0) {
+            return;
+        }
+        if (ptr >= times.length) {
+            compact();
+            return;
+        }
+        eventCounter = 0;
+        long time = System.currentTimeMillis();
+        if (offset < 0) {
+            offset = time;
+        }
+        int value = (int) (time - offset);
+        times[ptr++] = value;
+    }
+
+    private void compact() {
+        for (int i = 1; i < ptr / 2; ++i) {
+            times[i] = times[i * 2];
+        }
+        resolution <<= 1;
+        ptr >>= 1;
+    }
+
+    public int getResolution() {
+        return resolution;
+    }
+
+    public int getCount() {
+        return ptr;
+    }
+
+    public int[] getSamples() {
+        return times;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java
new file mode 100644
index 0000000..c159ba5
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.common.data;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+public class PointablePrimitiveValueProviderFactory implements IPrimitiveValueProviderFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final IPointableFactory pf;
+
+    public PointablePrimitiveValueProviderFactory(IPointableFactory pf) {
+        this.pf = pf;
+    }
+
+    @Override
+    public IPrimitiveValueProvider createPrimitiveValueProvider() {
+        final IPointable p = pf.createPointable();
+        ITypeTraits traits = pf.getTypeTraits();
+        assert traits.isFixedLength();
+        final int length = traits.getFixedLength();
+        return new IPrimitiveValueProvider() {
+            @Override
+            public double getValue(byte[] bytes, int offset) {
+                p.set(bytes, offset, length);
+                return ((INumeric) p).doubleValue();
+            }
+        };
+    }
+}
\ No newline at end of file