Changes required to support MessageBroker implementation

This change includes the following:

- API changes to support MessageBroker implementation.
- IResourceIdFactory interface to support application dependent implementation.

Change-Id: Ib9f49234eebe912c48e7f71980433a9b42595741
Reviewed-on: https://asterix-gerrit.ics.uci.edu/485
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
index 29401a1..1fb2291 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
@@ -23,6 +23,7 @@
 
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.api.service.IControllerService;
 
 /**
  * Base class of the {@link ICCApplicationContext} and the {@link INCApplicationContext}.
@@ -48,4 +49,9 @@
 
     public void setThreadFactory(ThreadFactory threadFactory);
 
+    /**
+     * @return The controller service which the application context belongs to.
+     */
+    public IControllerService getControllerService();
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 6023323..fd1d376 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -40,5 +40,5 @@
 
     public IDatasetPartitionManager getDatasetPartitionManager();
 
-    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 40a1baa..2b166d2 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -23,6 +23,6 @@
  */
 public interface IMessageBroker {
 
-    public void receivedMessage(IMessage message, String nodeId);
+    public void receivedMessage(IMessage message, String nodeId) throws Exception;
 
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
similarity index 91%
rename from hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
rename to hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index d6a4a45..c67ec45 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.service;
+package org.apache.hyracks.api.service;
 
-public interface IService {
+public interface IControllerService {
     public void start() throws Exception;
 
     public void stop() throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 1c27376..e8b2c27 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
@@ -84,7 +85,6 @@
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
 import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
 import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
-import org.apache.hyracks.control.common.AbstractRemoteService;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
@@ -104,7 +104,7 @@
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 import org.xml.sax.InputSource;
 
-public class ClusterControllerService extends AbstractRemoteService {
+public class ClusterControllerService implements IControllerService {
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
 
     private final CCConfig ccConfig;
@@ -252,14 +252,14 @@
     }
 
     private void startApplication() throws Exception {
-        appCtx = new CCApplicationContext(serverCtx, ccContext);
+        appCtx = new CCApplicationContext(this, serverCtx, ccContext);
         appCtx.addJobLifecycleListener(datasetDirectoryService);
         String className = ccConfig.appCCMainClass;
         if (className != null) {
             Class<?> c = Class.forName(className);
             aep = (ICCApplicationEntryPoint) c.newInstance();
-            String[] args = ccConfig.appArgs == null ? null : ccConfig.appArgs.toArray(new String[ccConfig.appArgs
-                    .size()]);
+            String[] args = ccConfig.appArgs == null ? null
+                    : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
             aep.start(appCtx, args);
         }
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -359,7 +359,8 @@
 
     private class HyracksClientInterfaceIPCI implements IIPCI {
         @Override
-        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
+                Exception exception) {
             HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case GET_CLUSTER_CONTROLLER_INFO: {
@@ -388,8 +389,8 @@
                 case START_JOB: {
                     HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
-                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), sjf
-                            .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
+                            sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
                     return;
                 }
 
@@ -401,15 +402,15 @@
 
                 case GET_DATASET_RESULT_STATUS: {
                     HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
-                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
-                            .getResultSetId(), new IPCResponder<Status>(handle, mid)));
+                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
+                            gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
                     return;
                 }
 
                 case GET_DATASET_RESULT_LOCATIONS: {
                     HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
-                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
-                            .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
+                            gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
                             new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
                     return;
                 }
@@ -438,8 +439,8 @@
 
                 case CLI_DEPLOY_BINARY: {
                     HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
-                    workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(), dbf
-                            .getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
+                    workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
+                            dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
                     return;
                 }
 
@@ -483,22 +484,22 @@
 
                 case NODE_HEARTBEAT: {
                     CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
-                    workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
-                            .getHeartbeatData()));
+                    workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
+                            nhf.getHeartbeatData()));
                     return;
                 }
 
                 case NOTIFY_JOBLET_CLEANUP: {
                     CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
-                    workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
-                            njcf.getJobId(), njcf.getNodeId()));
+                    workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
+                            njcf.getNodeId()));
                     return;
                 }
 
                 case NOTIFY_DEPLOY_BINARY: {
                     CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                    workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this,
-                            ndbf.getDeploymentId(), ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                    workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
+                            ndbf.getNodeId(), ndbf.getDeploymentStatus()));
                     return;
                 }
 
@@ -510,35 +511,35 @@
 
                 case NOTIFY_TASK_COMPLETE: {
                     CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
-                    workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
-                            .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+                    workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
+                            ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
                     return;
                 }
                 case NOTIFY_TASK_FAILURE: {
                     CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
-                    workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
-                            .getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+                    workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
+                            ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
                     return;
                 }
 
                 case REGISTER_PARTITION_PROVIDER: {
                     CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
-                    workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
-                            .getPartitionDescriptor()));
+                    workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
+                            rppf.getPartitionDescriptor()));
                     return;
                 }
 
                 case REGISTER_PARTITION_REQUEST: {
                     CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
-                    workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
-                            .getPartitionRequest()));
+                    workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
+                            rprf.getPartitionRequest()));
                     return;
                 }
 
                 case REGISTER_RESULT_PARTITION_LOCATION: {
                     CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                    workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+                    workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
+                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
                             rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                     return;
                 }
@@ -552,15 +553,15 @@
 
                 case REPORT_RESULT_PARTITION_FAILURE: {
                     CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
-                    workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+                    workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
+                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                     return;
                 }
 
                 case SEND_APPLICATION_MESSAGE: {
                     CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
-                            .getDeploymentId(), rsf.getNodeId()));
+                    workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
+                            rsf.getDeploymentId(), rsf.getNodeId()));
                     return;
                 }
 
@@ -637,10 +638,11 @@
         deploymentRunMap.remove(deploymentKey);
     }
 
-    public synchronized void setShutdownRun(ShutdownRun sRun){
+    public synchronized void setShutdownRun(ShutdownRun sRun) {
         shutdownCallback = sRun;
     }
-    public synchronized ShutdownRun getShutdownRun(){
+
+    public synchronized ShutdownRun getShutdownRun() {
         return shutdownCallback;
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 988b678..a4f569b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -33,6 +33,8 @@
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ApplicationContext;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -48,10 +50,13 @@
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
     private List<IClusterLifecycleListener> clusterLifecycleListeners;
+    private final ClusterControllerService ccs;
 
-    public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext) throws IOException {
+    public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext)
+            throws IOException {
         super(serverCtx);
         this.ccContext = ccContext;
+        this.ccs = ccs;
         initPendingNodeIds = new HashSet<String>();
         deinitPendingNodeIds = new HashSet<String>();
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
@@ -107,4 +112,9 @@
             l.notifyNodeFailure(deadNodeIds);
         }
     }
+
+    @Override
+    public IControllerService getControllerService() {
+        return ccs;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 8718e4f..40a83a2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -38,7 +38,8 @@
     private String nodeId;
     private ClusterControllerService ccs;
 
-    public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
+    public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
+            String nodeId) {
         super(ccs, nodeId, null);
         this.ccs = ccs;
         this.deploymentId = deploymentId;
@@ -54,7 +55,11 @@
             ccs.getExecutor().execute(new Runnable() {
                 @Override
                 public void run() {
-                    ctx.getMessageBroker().receivedMessage(data, nodeId);
+                    try {
+                        ctx.getMessageBroker().receivedMessage(data, nodeId);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
                 }
             });
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java
deleted file mode 100644
index 2f63416..0000000
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common;
-
-import org.apache.hyracks.control.common.service.IService;
-
-public abstract class AbstractRemoteService implements IService {
-    public AbstractRemoteService() {
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index ec58b63..490faad 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -36,8 +36,8 @@
 
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
-            List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
+            List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+            EnumSet<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
@@ -52,4 +52,6 @@
     public void dumpState(String stateDumpId) throws Exception;
 
     public void shutDown() throws Exception;
+
+    public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 331d9ef..22ee6b3 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -45,8 +45,8 @@
 
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
-            List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
+            List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+            EnumSet<JobFlag> flags) throws Exception {
         CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
                 taskDescriptors, connectorPolicies, flags);
         ipcHandle.send(-1, stf, null);
@@ -94,4 +94,11 @@
         CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction();
         ipcHandle.send(-1, sdrf, null);
     }
+
+    @Override
+    public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+                deploymentId, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java
deleted file mode 100644
index d2d0243..0000000
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.service;
-
-public abstract class AbstractService implements IService {
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 1a50211..7b5758c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -53,7 +53,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.control.common.AbstractRemoteService;
+import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -93,7 +93,7 @@
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
-public class NodeControllerService extends AbstractRemoteService {
+public class NodeControllerService implements IControllerService {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
@@ -316,7 +316,7 @@
     }
 
     private void startApplication() throws Exception {
-        appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager, lccm);
+        appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm);
         String className = ncConfig.appNCMainClass;
         if (className != null) {
             Class<?> c = Class.forName(className);
@@ -558,8 +558,8 @@
         }
     }
 
-    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
-        ccs.sendApplicationMessageToCC(data, deploymentId, nodeId);
+    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
+        ccs.sendApplicationMessageToCC(data, deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9a3582a..12df264 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -380,7 +380,7 @@
     }
 
     @Override
-    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
-        this.ncs.sendApplicationMessageToCC(message, deploymentId, nodeId);
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
+        this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index 0a2aaec..e262738 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -27,8 +27,10 @@
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
+import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.common.application.ApplicationContext;
 import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 
 public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
@@ -38,14 +40,17 @@
     private final MemoryManager memoryManager;
     private Object appObject;
     private IStateDumpHandler sdh;
+    private final NodeControllerService ncs;
 
-    public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
-            MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager) throws IOException {
+    public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx,
+            String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager)
+                    throws IOException {
         super(serverCtx);
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
         this.rootCtx = rootCtx;
         this.memoryManager = memoryManager;
+        this.ncs = ncs;
         sdh = new IStateDumpHandler() {
 
             @Override
@@ -97,4 +102,9 @@
     public IMemoryManager getMemoryManager() {
         return memoryManager;
     }
+
+    @Override
+    public IControllerService getControllerService() {
+        return ncs;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 2b6b208..c6680bc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -31,8 +31,8 @@
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.storage.common.file.LocalResource;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public abstract class IndexDataflowHelper implements IIndexDataflowHelper {
 
@@ -40,7 +40,7 @@
     protected final IHyracksTaskContext ctx;
     protected final IIndexLifecycleManager lcManager;
     protected final ILocalResourceRepository localResourceRepository;
-    protected final ResourceIdFactory resourceIdFactory;
+    protected final IResourceIdFactory resourceIdFactory;
     protected final FileReference file;
     protected final int partition;
     protected final int ioDeviceId;
@@ -57,8 +57,9 @@
         this.resourceIdFactory = opDesc.getStorageManager().getResourceIdFactory(ctx);
         this.partition = partition;
         this.ioDeviceId = opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId();
-        this.file = new FileReference(new File(IndexFileNameUtil.prepareFileName(opDesc.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), ioDeviceId)));
+        this.file = new FileReference(new File(IndexFileNameUtil.prepareFileName(
+                opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(),
+                ioDeviceId)));
         this.durable = durable;
         this.resourceName = file.getFile().getPath();
     }
@@ -92,8 +93,8 @@
                 resourceID = resourceIdFactory.createId();
                 ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
                         .getLocalResourceFactory();
-                localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
-                        partition));
+                localResourceRepository
+                        .insert(localResourceFactory.createLocalResource(resourceID, resourceName, partition));
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
index 8d74ce3..7a68051 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public interface IStorageManagerInterface extends Serializable {
     public IBufferCache getBufferCache(IHyracksTaskContext ctx);
@@ -33,5 +33,5 @@
 
     public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx);
 
-    public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx);
+    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
similarity index 72%
copy from hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
copy to hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index d6a4a45..42fd6d4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -16,10 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.service;
+package org.apache.hyracks.storage.common.file;
 
-public interface IService {
-    public void start() throws Exception;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-    public void stop() throws Exception;
+public interface IResourceIdFactory {
+
+    /**
+     * @return A unique monotonically increasing id.
+     * @throws Exception
+     */
+    long createId() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
index 0adf998..625d247 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
@@ -20,13 +20,14 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class ResourceIdFactory {
+public class ResourceIdFactory implements IResourceIdFactory {
     private AtomicLong id = null;
 
     public ResourceIdFactory(long initialValue) {
         id = new AtomicLong(initialValue);
     }
 
+    @Override
     public long createId() {
         return id.getAndIncrement();
     }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
index b2a5765..f619be0 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
+import org.apache.hyracks.api.service.IControllerService;
 
 public class TestNCApplicationContext implements INCApplicationContext {
     private final ILifeCycleComponentManager lccm;
@@ -128,4 +129,9 @@
     @Override
     public void setStateDumpHandler(IStateDumpHandler handler) {
     }
+
+    @Override
+    public IControllerService getControllerService() {
+        return null;
+    }
 }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index d8cf599..6d954eb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -28,7 +28,6 @@
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
@@ -41,7 +40,7 @@
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
 
-    public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
+    public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
         this.jobletContext = jobletContext;
         this.taskId = taskId;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
@@ -130,8 +129,7 @@
     }
 
     @Override
-    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
-        // TODO Auto-generated method stub
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
 
     }
 
@@ -139,4 +137,4 @@
     public ExecutorService getExecutorService() {
         return null;
     }
-}
+}
\ No newline at end of file