diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index c38b3fc..3912bd5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -24,6 +24,8 @@
 import java.io.PrintWriter;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
@@ -33,8 +35,10 @@
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public class AbstractQueryApiServlet extends AbstractServlet {
+    private static final Logger LOGGER = Logger.getLogger(AbstractQueryApiServlet.class.getName());
     protected final IApplicationContext appCtx;
 
     public enum ResultFields {
@@ -99,9 +103,19 @@
     }
 
     protected IHyracksDataset getHyracksDataset() throws Exception { // NOSONAR
-        synchronized (ctx) {
-            IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
-            if (hds == null) {
+        try {
+            return doGetHyracksDataset();
+        } catch (IPCException e) {
+            LOGGER.log(Level.WARNING, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e);
+            ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc());
+            return doGetHyracksDataset();
+        }
+    }
+
+    protected IHyracksDataset doGetHyracksDataset() throws Exception {
+        IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+        if (hds == null) {
+            synchronized (ctx) {
                 hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
                 if (hds == null) {
                     hds = new HyracksDataset(getHyracksClientConnection(),
@@ -109,18 +123,16 @@
                     ctx.put(HYRACKS_DATASET_ATTR, hds);
                 }
             }
-            return hds;
         }
+        return hds;
     }
 
     protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
-        synchronized (ctx) {
-            final IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
-            if (hcc == null) {
-                throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
-            }
-            return hcc;
+        IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+        if (hcc == null) {
+            throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
         }
+        return hcc;
     }
 
     protected static UUID printRequestId(PrintWriter pw) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index da06dd1..616c22e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -45,6 +46,7 @@
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -64,9 +66,9 @@
     }
 
     @Override
-    protected void executeStatement(String statementsText,
-            SessionOutput sessionOutput, IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats,
-            RequestParameters param, long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+            IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+            long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -83,10 +85,9 @@
             if (param.timeout != null) {
                 timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
             }
-            ExecuteStatementRequestMessage requestMsg =
-                    new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
-                            statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl,
-                            optionalParameters);
+            ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
+                    responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
+                    param.clientContextID, handleUrl, optionalParameters);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
             try {
@@ -148,7 +149,8 @@
 
     @Override
     protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
-        if (t instanceof IPCException || t instanceof TimeoutException) {
+        if (t instanceof TimeoutException
+                || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
             return HttpResponseStatus.SERVICE_UNAVAILABLE;
         } else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 901aff8..cce5099 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -61,7 +61,6 @@
         IHyracksDataset hds = getHyracksDataset();
         ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId());
 
-
         try {
             DatasetJobRecord.Status status = resultReader.getStatus();
 
@@ -98,7 +97,7 @@
             ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
-            if (ErrorCode.NO_RESULTSET == errorCode) {
+            if (ErrorCode.NO_RESULT_SET == errorCode) {
                 LOGGER.log(Level.INFO, "No results for: \"" + strHandle + "\"");
                 response.setStatus(HttpResponseStatus.NOT_FOUND);
                 return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index eedc8ec..3e0f2c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -76,10 +76,14 @@
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -119,14 +123,11 @@
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
     private IMetadataNode metadataNodeStub;
-
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
     private IIOManager ioManager;
     private boolean isShuttingdown;
-
     private ActiveManager activeManager;
-
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
@@ -134,6 +135,7 @@
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
+    private IHyracksClientConnection hcc;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -485,4 +487,22 @@
     public INCServiceContext getServiceContext() {
         return ncServiceContext;
     }
+
+    @Override
+    public IHyracksClientConnection getHcc() throws HyracksDataException {
+        if (hcc == null || !hcc.isConnected()) {
+            synchronized (this) {
+                if (hcc == null || !hcc.isConnected()) {
+                    try {
+                        NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
+                        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+                        hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+                    } catch (Exception e) {
+                        throw HyracksDataException.create(e);
+                    }
+                }
+            }
+        }
+        return hcc;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e8f63b4..3d7f870 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -48,9 +48,6 @@
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -81,7 +78,7 @@
 
     @Override
     public void init(IServiceContext serviceCtx) throws Exception {
-        this.ncServiceCtx = (INCServiceContext) serviceCtx;
+        ncServiceCtx = (INCServiceContext) serviceCtx;
         ncServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
     }
@@ -103,7 +100,7 @@
             System.setProperty("java.rmi.server.hostname",
                     (controllerService).getConfiguration().getClusterPublicAddress());
         }
-        runtimeContext = new NCAppRuntimeContext(this.ncServiceCtx, getExtensions());
+        runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions());
         MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -115,8 +112,8 @@
         MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
         IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
-        MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
-                (NCMessageBroker) messageBroker, messagingProperties);
+        MessagingChannelInterfaceFactory interfaceFactory =
+                new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -228,8 +225,8 @@
         String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
         for (String ioDevice : ioDevices) {
-            String tempDatasetsDir = ioDevice + storageDirName + File.separator
-                    + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            String tempDatasetsDir =
+                    ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
             File tmpDsDir = new File(tempDatasetsDir);
             if (tmpDsDir.exists()) {
                 IoUtil.delete(tmpDsDir);
@@ -307,10 +304,4 @@
             return devices.get(ioDeviceIndex);
         };
     }
-
-    protected IHyracksClientConnection getHcc() throws Exception {
-        NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService();
-        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
-        return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
-    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index e1840d3..4e30d54 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -30,6 +30,8 @@
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IApplicationContext {
 
@@ -56,7 +58,17 @@
     /**
      * @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager}
      */
-    public ILibraryManager getLibraryManager();
+    ILibraryManager getLibraryManager();
 
+    /**
+     * @return the service context
+     */
     IServiceContext getServiceContext();
+
+    /**
+     * @return a connected instance of {@link IHyracksClientConnection}
+     * @throws HyracksDataException
+     *             if connection couldn't be established to cluster controller
+     */
+    IHyracksClientConnection getHcc() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 20f685a..690d1fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.storage.common.IStorageManager;
 
@@ -71,11 +70,6 @@
     IJobLifecycleListener getActiveNotificationHandler();
 
     /**
-     * @return a new instance of {@link IHyracksClientConnection}
-     */
-    IHyracksClientConnection getHcc();
-
-    /**
      * @return the cluster wide resource id manager
      */
     IResourceIdManager getResourceIdManager();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index e4cc7f4..855031e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -45,7 +45,9 @@
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.storage.common.IStorageManager;
 
@@ -155,7 +157,18 @@
     }
 
     @Override
-    public IHyracksClientConnection getHcc() {
+    public IHyracksClientConnection getHcc() throws HyracksDataException {
+        if (!hcc.isConnected()) {
+            synchronized (this) {
+                if (!hcc.isConnected()) {
+                    try {
+                        hcc = new HyracksConnection(hcc.getHost(), hcc.getPort());
+                    } catch (Exception e) {
+                        throw HyracksDataException.create(e);
+                    }
+                }
+            }
+        }
         return hcc;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..0ded84f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -70,8 +70,8 @@
 
     @Override
     public void cancelJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+                new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
         rpci.call(ipcHandle, cjf);
     }
 
@@ -84,8 +84,8 @@
 
     @Override
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
-                deploymentId, acggfBytes, jobFlags);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -165,8 +165,8 @@
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS
-                    + " seconds");
+            throw new IPCException(
+                    "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
         }
     }
 
@@ -181,6 +181,11 @@
     public String getThreadDump(String node) throws Exception {
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
-        return (String)rpci.call(ipcHandle, tdf);
+        return (String) rpci.call(ipcHandle, tdf);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return ipcHandle.isConnected();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..e979da6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -44,7 +44,6 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
@@ -58,6 +57,8 @@
 public final class HyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
 
+    private final int ccPort;
+
     private final IPCSystem ipc;
 
     private final IHyracksClientInterface hci;
@@ -77,11 +78,11 @@
      */
     public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
+        this.ccPort = ccPort;
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
-        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
+        hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci);
         ccInfo = hci.getClusterControllerInfo();
     }
 
@@ -124,6 +125,7 @@
         return hci.startJob(jobId);
     }
 
+    @Override
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
@@ -132,6 +134,7 @@
         return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         return hci.getDatasetDirectoryServiceInfo();
     }
@@ -242,4 +245,19 @@
     public String getThreadDump(String node) throws Exception {
         return hci.getThreadDump(node);
     }
+
+    @Override
+    public String getHost() {
+        return ccHost;
+    }
+
+    @Override
+    public int getPort() {
+        return ccPort;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return hci.isConnected();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..0189135 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -44,7 +44,7 @@
      * @return {@link JobStatus}
      * @throws Exception
      */
-    public JobStatus getJobStatus(JobId jobId) throws Exception;
+    JobStatus getJobStatus(JobId jobId) throws Exception;
 
     /**
      * Gets detailed information about the specified Job.
@@ -54,7 +54,7 @@
      * @return {@link JobStatus}
      * @throws Exception
      */
-    public JobInfo getJobInfo(JobId jobId) throws Exception;
+    JobInfo getJobInfo(JobId jobId) throws Exception;
 
     /**
      * Cancel the job that has the given job id.
@@ -63,7 +63,7 @@
      *            the JobId of the Job
      * @throws Exception
      */
-    public void cancelJob(JobId jobId) throws Exception;
+    void cancelJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -72,7 +72,7 @@
      *            Job Specification
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec) throws Exception;
+    JobId startJob(JobSpecification jobSpec) throws Exception;
 
     /**
      * Start the specified Job.
@@ -83,7 +83,7 @@
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Distribute the specified Job.
@@ -94,7 +94,7 @@
      *            Flags
      * @throws Exception
      */
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    JobId distributeJob(JobSpecification jobSpec) throws Exception;
 
     /**
      * Destroy the distributed graph for a pre-distributed job
@@ -103,7 +103,7 @@
      *            The id of the predistributed job
      * @throws Exception
      */
-    public JobId destroyJob(JobId jobId) throws Exception;
+    JobId destroyJob(JobId jobId) throws Exception;
 
     /**
      * Used to run a pre-distributed job by id (the same JobId will be returned)
@@ -112,7 +112,7 @@
      *            The id of the predistributed job
      * @throws Exception
      */
-    public JobId startJob(JobId jobId) throws Exception;
+    JobId startJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -123,7 +123,7 @@
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
@@ -131,7 +131,7 @@
      * @return {@link NetworkAddress}
      * @throws Exception
      */
-    public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+    NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
     /**
      * Waits until the specified job has completed, either successfully or has
@@ -141,8 +141,7 @@
      *            JobId of the Job
      * @throws Exception
      */
-    public void waitForCompletion(JobId jobId) throws Exception;
-
+    void waitForCompletion(JobId jobId) throws Exception;
 
     /**
      * Deploy the user-defined jars to the cluster
@@ -150,7 +149,7 @@
      * @param jars
      *            a list of user-defined jars
      */
-    public DeploymentId deployBinary(List<String> jars) throws Exception;
+    DeploymentId deployBinary(List<String> jars) throws Exception;
 
     /**
      * undeploy a certain deployment
@@ -158,7 +157,7 @@
      * @param deploymentId
      *            the id for the deployment to be undeployed
      */
-    public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+    void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -169,7 +168,7 @@
      *            Job Specification
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
 
     /**
      * Start the specified Job.
@@ -182,8 +181,7 @@
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
-            throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Start the specified Job.
@@ -196,27 +194,45 @@
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
 
     /**
      * Shuts down all NCs and then the CC.
+     *
      * @param terminateNCService
      */
-    public void stopCluster(boolean terminateNCService) throws Exception;
+    void stopCluster(boolean terminateNCService) throws Exception;
 
     /**
      * Get details of specified node as JSON object
+     *
      * @param nodeId
-     *              id the subject node
+     *            id the subject node
      * @param includeStats
-     * @param includeConfig @return serialized JSON containing the node details
+     * @param includeConfig
+     * @return serialized JSON containing the node details
      * @throws Exception
      */
-    public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
+    String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
 
     /**
      * Gets thread dump from the specified node as a serialized JSON string
      */
-    public String getThreadDump(String node) throws Exception;
+    String getThreadDump(String node) throws Exception;
+
+    /**
+     * @return true if the connection is alive, false otherwise
+     */
+    boolean isConnected();
+
+    /**
+     * @return the hostname of the cluster controller
+     */
+    String getHost();
+
+    /**
+     * @return the port of the cluster controller
+     */
+    int getPort();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..9cebd3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -67,4 +67,6 @@
     public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
 
     public String getThreadDump(String node) throws Exception;
+
+    public boolean isConnected();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 68e7cd1..4bb2869 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -57,7 +57,7 @@
     public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
     public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
     public static final int DISTRIBUTED_JOB_FAILURE = 23;
-    public static final int NO_RESULTSET = 24;
+    public static final int NO_RESULT_SET = 24;
     public static final int JOB_CANCELED = 25;
     public static final int NODE_FAILED = 26;
     public static final int FILE_IS_NOT_DIRECTORY = 27;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index fdac7f1..31fd379 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -93,7 +93,7 @@
         try {
             return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
         } catch (HyracksDataException e) {
-            if (e.getErrorCode() != ErrorCode.NO_RESULTSET) {
+            if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
                 LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + jobId, e);
             }
         } catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 8400a59..7be6524 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -28,12 +28,12 @@
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -41,7 +41,7 @@
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
 
-    private final INodeController nodeController;
+    private final NodeControllerRemoteProxy nodeController;
 
     private final NCConfig ncConfig;
 
@@ -145,7 +145,7 @@
 
     private NodeCapacity capacity;
 
-    public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
+    public NodeControllerState(NodeControllerRemoteProxy nodeController, NodeRegistration reg) {
         this.nodeController = nodeController;
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
@@ -251,7 +251,7 @@
         return lastHeartbeatDuration++;
     }
 
-    public INodeController getNodeController() {
+    public NodeControllerRemoteProxy getNodeController() {
         return nodeController;
     }
 
@@ -279,7 +279,7 @@
         return capacity;
     }
 
-    public synchronized ObjectNode toSummaryJSON()  {
+    public synchronized ObjectNode toSummaryJSON() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
         o.put("node-id", ncConfig.getNodeId());
@@ -289,7 +289,7 @@
         return o;
     }
 
-    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig)  {
+    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8cca1e0..2d43d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -44,6 +45,9 @@
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName());
@@ -93,6 +97,13 @@
             LOGGER.warning(
                     "Node with name " + nodeId + " has already registered; failing the node then re-registering.");
             removeDeadNode(nodeId);
+        } else {
+            try {
+                IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
+                ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+            } catch (IPCException e) {
+                throw HyracksDataException.create(e);
+            }
         }
         LOGGER.warning("adding node to registry");
         nodeRegistry.put(nodeId, ncState);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7a9306c..ca1c91b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -107,7 +107,7 @@
     private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) throws HyracksDataException {
         final DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr == null) {
-            throw HyracksDataException.create(ErrorCode.NO_RESULTSET, jobId);
+            throw HyracksDataException.create(ErrorCode.NO_RESULT_SET, jobId);
         }
         return djr;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index bf0846f..5866ba5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -28,7 +28,6 @@
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
@@ -55,7 +54,8 @@
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id);
-            INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+            NodeControllerRemoteProxy nc =
+                    new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index f51dd06..7117b6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -18,14 +18,24 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.NoOpCallback;
 
 public class RegisterResultPartitionLocationWork extends AbstractWork {
+
+    private static final Logger LOGGER = Logger.getLogger(RegisterResultPartitionLocationWork.class.getName());
+
     private final ClusterControllerService ccs;
 
     private final JobId jobId;
@@ -43,8 +53,7 @@
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
-            networkAddress) {
+            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
@@ -58,17 +67,24 @@
     @Override
     public void run() {
         try {
-            ccs.getDatasetDirectoryService()
-                    .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
-                            networkAddress);
+            ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+                    partition, nPartitions, networkAddress);
         } catch (HyracksDataException e) {
-            throw new RuntimeException(e);
+            LOGGER.log(Level.WARNING, "Failed to register partition location", e);
+            // Should fail the job if exists on cc, otherwise, do nothing
+            JobRun jobRun = ccs.getJobManager().get(jobId);
+            if (jobRun != null) {
+                List<Exception> exceptions = new ArrayList<>();
+                exceptions.add(e);
+                jobRun.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
+            }
         }
     }
 
     @Override
     public String toString() {
-        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@"
+                + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult
+                + " EmptyResult@" + emptyResult;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index fa6580e..9b9a3b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -27,13 +27,19 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.cc.scheduler.ResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class NodeManagerTest {
 
@@ -43,9 +49,9 @@
     private static final String NODE2 = "node2";
 
     @Test
-    public void testNormal() throws HyracksException {
+    public void testNormal() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
         NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
@@ -68,9 +74,9 @@
     }
 
     @Test
-    public void testException() throws HyracksException {
+    public void testException() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
@@ -86,6 +92,16 @@
         verifyEmptyCluster(resourceManager, nodeManager);
     }
 
+    private ClusterControllerService mockCcs() throws IPCException {
+        ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
+        IPCSystem ipcSystem = Mockito.mock(IPCSystem.class);
+        IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class);
+        Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
+        Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
+        Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle);
+        return ccs;
+    }
+
     @Test
     public void testNullNode() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
@@ -112,6 +128,7 @@
 
     private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
         NodeControllerState ncState = mock(NodeControllerState.class);
+        NodeControllerRemoteProxy ncProxy = Mockito.mock(NodeControllerRemoteProxy.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
         NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
@@ -123,6 +140,7 @@
         NCConfig ncConfig = new NCConfig(nodeId);
         ncConfig.setDataPublicAddress(ipAddr);
         when(ncState.getNCConfig()).thenReturn(ncConfig);
+        Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
         return ncState;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 0d46d64..251aed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -45,8 +45,8 @@
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.apache.hyracks.control.common.work.NoOpCallback;
 import org.junit.Assert;
@@ -293,7 +293,7 @@
     private INodeManager mockNodeManager() {
         INodeManager nodeManager = mock(NodeManager.class);
         NodeControllerState ncState = mock(NodeControllerState.class);
-        INodeController nodeController = mock(INodeController.class);
+        NodeControllerRemoteProxy nodeController = mock(NodeControllerRemoteProxy.class);
         when(nodeManager.getNodeControllerState(any())).thenReturn(ncState);
         when(ncState.getNodeController()).thenReturn(nodeController);
         return nodeManager;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index d42c4a8..4797ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -87,6 +87,7 @@
         NODE_REGISTRATION_RESULT,
         START_TASKS,
         ABORT_TASKS,
+        ABORT_ALL_JOBS,
         CLEANUP_JOBLET,
         REPORT_PARTITION_AVAILABILITY,
         SEND_APPLICATION_MESSAGE,
@@ -665,6 +666,16 @@
         }
     }
 
+    //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
+    public static class AbortCCJobsFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_ALL_JOBS;
+        }
+    }
+
     public static class DistributeJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -782,7 +793,7 @@
 
             // read task attempt descriptors
             int tadSize = dis.readInt();
-            List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>();
+            List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<>();
             for (int i = 0; i < tadSize; i++) {
                 TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
                 taskDescriptors.add(tad);
@@ -790,7 +801,7 @@
 
             //read connector policies
             int cpSize = dis.readInt();
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>();
             for (int i = 0; i < cpSize; i++) {
                 ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
                 IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
@@ -1362,8 +1373,8 @@
         int cdid = dis.readInt();
         int senderIndex = dis.readInt();
         int receiverIndex = dis.readInt();
-        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex,
-                receiverIndex);
+        PartitionId pid =
+                new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
         return pid;
     }
 
@@ -1379,8 +1390,8 @@
         int aid = dis.readInt();
         int partition = dis.readInt();
         int attempt = dis.readInt();
-        TaskAttemptId taId = new TaskAttemptId(
-                new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
+        TaskAttemptId taId =
+                new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
         return taId;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 2815ae1..d4ccbd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -21,13 +21,14 @@
 import java.net.InetSocketAddress;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public abstract class ControllerRemoteProxy {
     protected final IPCSystem ipc;
-    protected final InetSocketAddress inetSocketAddress;
+    private final InetSocketAddress inetSocketAddress;
     private final IControllerRemoteProxyIPCEventListener eventListener;
     private IIPCHandle ipcHandle;
 
@@ -36,28 +37,33 @@
     }
 
     protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
-                                    IControllerRemoteProxyIPCEventListener eventListener) {
+            IControllerRemoteProxyIPCEventListener eventListener) {
         this.ipc = ipc;
         this.inetSocketAddress = inetSocketAddress;
-        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener;
+        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {
+        } : eventListener;
     }
 
-    protected IIPCHandle ensureIpcHandle() throws IPCException {
-        final boolean first = ipcHandle == null;
-        if (first || !ipcHandle.isConnected()) {
-            if (!first) {
-                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
-                eventListener.ipcHandleDisconnected(ipcHandle);
-            }
-            ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
-            if (ipcHandle.isConnected()) {
-                if (first) {
-                    eventListener.ipcHandleConnected(ipcHandle);
-                } else {
-                    getLogger().warning("ipcHandle " + ipcHandle + " restored");
-                    eventListener.ipcHandleRestored(ipcHandle);
+    protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+        try {
+            final boolean first = ipcHandle == null;
+            if (first || !ipcHandle.isConnected()) {
+                if (!first) {
+                    getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+                    eventListener.ipcHandleDisconnected(ipcHandle);
+                }
+                ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+                if (ipcHandle.isConnected()) {
+                    if (first) {
+                        eventListener.ipcHandleConnected(ipcHandle);
+                    } else {
+                        getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                        eventListener.ipcHandleRestored(ipcHandle);
+                    }
                 }
             }
+        } catch (IPCException e) {
+            throw HyracksDataException.create(e);
         }
         return ipcHandle;
     }
@@ -65,4 +71,8 @@
     protected abstract int getRetries(boolean first);
 
     protected abstract Logger getLogger();
+
+    public InetSocketAddress getAddress() {
+        return inetSocketAddress;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..1eb1393 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
 import org.apache.hyracks.control.nc.task.ShutdownTask;
 import org.apache.hyracks.control.nc.task.ThreadDumpTask;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
 import org.apache.hyracks.control.nc.work.AbortTasksWork;
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
@@ -55,10 +56,9 @@
         CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
-                CCNCFunctions.SendApplicationMessageFunction amf =
-                        (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(),
-                        amf.getDeploymentId(), amf.getNodeId()));
+                CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ncs.getWorkQueue().schedule(
+                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
@@ -69,6 +69,9 @@
                 CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
                 ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
                 return;
+            case ABORT_ALL_JOBS:
+                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+                return;
             case CLEANUP_JOBLET:
                 CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
                 ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus()));
@@ -76,8 +79,8 @@
             case REPORT_PARTITION_AVAILABILITY:
                 CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
                         (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs,
-                        rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+                ncs.getWorkQueue().schedule(
+                        new ReportPartitionAvailabilityWork(ncs, rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                 return;
             case NODE_REGISTRATION_RESULT:
                 CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
@@ -92,8 +95,7 @@
 
             case DEPLOY_BINARY:
                 CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(),
-                        dbf.getBinaryURLs()));
+                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
                 return;
 
             case UNDEPLOY_BINARY:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b0a702d..f4ec6e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -125,7 +125,7 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap;
+    private final Map<JobId, ActivityClusterGraph> preDistributedJobs;
 
     private ExecutorService executor;
 
@@ -199,7 +199,7 @@
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
-        preDistributedJobActivityClusterGraphMap = new Hashtable<>();
+        preDistributedJobs = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -418,27 +418,27 @@
     }
 
     public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobs.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
-        preDistributedJobActivityClusterGraphMap.put(jobId, acg);
+        preDistributedJobs.put(jobId, acg);
     }
 
     public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
+        if (preDistributedJobs.get(jobId) == null) {
             throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
         }
-        preDistributedJobActivityClusterGraphMap.remove(jobId);
+        preDistributedJobs.remove(jobId);
     }
 
     public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobs.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
     }
 
     public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
-        return preDistributedJobActivityClusterGraphMap.get(jobId);
+        return preDistributedJobs.get(jobId);
     }
 
     public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
similarity index 72%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 4fb4bf6..56100da 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,23 +18,23 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.Map;
+import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
 
-public class AbortAllTasksWork extends SynchronizableWork {
+public class AbortAllJobsWork extends SynchronizableWork {
 
-    private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(AbortAllJobsWork.class.getName());
     private final NodeControllerService ncs;
 
-    public AbortAllTasksWork(NodeControllerService ncs) {
+    public AbortAllJobsWork(NodeControllerService ncs) {
         this.ncs = ncs;
     }
 
@@ -46,14 +46,16 @@
         IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
         if (dpm != null) {
             ncs.getDatasetPartitionManager().abortAllReaders();
+        } else {
+            LOGGER.log(Level.WARNING, "DatasetPartitionManager is null on " + ncs.getId());
         }
-        for (Joblet ji : ncs.getJobletMap().values()) {
-            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
-            for (Task task : taskMap.values()) {
-                if (task != null) {
-                    task.abort();
-                }
+        Collection<Joblet> joblets = ncs.getJobletMap().values();
+        for (Joblet ji : joblets) {
+            Collection<Task> tasks = ji.getTaskMap().values();
+            for (Task task : tasks) {
+                task.abort();
             }
+            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE));
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..caba5f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.Mockito.verify;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -32,6 +33,8 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -69,9 +72,17 @@
         ccRoot.mkdir();
         ccConfig.setRootDir(ccRoot.getAbsolutePath());
         ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
+        // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many
+        // objects created in the constructor above
         cc = Mockito.spy(ccBase);
         cc.start();
 
+        // The following code partially fixes the problem created by the spying
+        INodeManager nodeManager = cc.getNodeManager();
+        Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
+        ccsInNodeManager.setAccessible(true);
+        ccsInNodeManager.set(nodeManager, cc);
+
         NCConfig ncConfig1 = new NCConfig(NC1_ID);
         ncConfig1.setClusterAddress("localhost");
         ncConfig1.setClusterPort(39001);
@@ -79,7 +90,7 @@
         ncConfig1.setDataListenAddress("127.0.0.1");
         ncConfig1.setResultListenAddress("127.0.0.1");
         ncConfig1.setResultSweepThreshold(5000);
-        ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+        ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
         NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
         nc1 = Mockito.spy(nc1Base);
         nc1.start();
@@ -91,7 +102,7 @@
         ncConfig2.setDataListenAddress("127.0.0.1");
         ncConfig2.setResultListenAddress("127.0.0.1");
         ncConfig2.setResultSweepThreshold(5000);
-        ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+        ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
         NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
         nc2 = Mockito.spy(nc2Base);
         nc2.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index aebe2f5..ba1c9a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -27,7 +27,7 @@
     private final Map<Long, Request> reqMap;
 
     public RPCInterface() {
-        reqMap = new HashMap<Long, RPCInterface.Request>();
+        reqMap = new HashMap<>();
     }
 
     public Object call(IIPCHandle handle, Object request) throws Exception {
