Re-register NC with CC on reestablished IPCHandle

In case of failed NC -> CC IPCHandle due to CC crash/restart, the NC
needs to re-register with the CC in order to rejoin the cluster, as the
CC ignore heartbeats from unregistered nodes.

- Improve toString on IPCHandle
- Add tests for killing & restarting CC / NCs to NCServiceExecutionIT
- Retrigger NCService on detected dead node
- Ensure jobIds are not reused on CC restart
- NCService shouldn't truncate NC log

Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1830
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 6ad85b3..c23da6c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -120,6 +120,7 @@
     private IFileMapManager fileMapManager;
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
+    private IMetadataNode metadataNodeStub;
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
@@ -464,9 +465,11 @@
 
     @Override
     public void exportMetadataNodeStub() throws RemoteException {
-        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
-                getMetadataProperties().getMetadataPort());
-        ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(stub);
+        if (metadataNodeStub == null) {
+            metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+                    getMetadataProperties().getMetadataPort());
+        }
+        ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(metadataNodeStub);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 107b859..7c8e153 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -67,7 +67,8 @@
     protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
-    private boolean stopInitiated = false;
+    private boolean stopInitiated;
+    private boolean startupCompleted;
     private SystemState systemState;
     protected WebManager webManager;
 
@@ -190,6 +191,15 @@
         }
         // Request startup tasks from CC
         StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+        startupCompleted = true;
+    }
+
+    @Override
+    public void onRegisterNode() throws Exception {
+        if (startupCompleted) {
+            // Request startup tasks from CC
+            StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 25b9f0a..edbcc7e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -72,6 +72,7 @@
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
 import org.apache.http.entity.ContentType;
@@ -1399,4 +1400,46 @@
         return servlet;
     }
 
+    public void waitForClusterActive(int timeoutSecs, TimeUnit timeUnit) throws Exception {
+        waitForClusterState("ACTIVE", timeoutSecs, timeUnit);
+    }
+
+    public void waitForClusterState(String desiredState, int timeout, TimeUnit timeUnit) throws Exception {
+        LOGGER.info("Waiting for cluster state " + desiredState + "...");
+        Thread t = new Thread(() -> {
+            while (true) {
+                try {
+                    final HttpClient client = HttpClients.createDefault();
+
+                    final HttpGet get = new HttpGet(createEndpointURI("/admin/cluster", null));
+                    final HttpResponse httpResponse = client.execute(get);
+                    final int statusCode = httpResponse.getStatusLine().getStatusCode();
+                    final String response = EntityUtils.toString(httpResponse.getEntity());
+                    if (statusCode != HttpStatus.SC_OK) {
+                        throw new Exception("HTTP error " + statusCode + ":\n" + response);
+                    }
+                    ObjectMapper om = new ObjectMapper();
+                    ObjectNode result = (ObjectNode) om.readTree(response);
+                    if (desiredState.equals(result.get("state").asText())) {
+                        break;
+                    }
+                } catch (Exception e) {
+                    // ignore, try again
+                }
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        });
+        t.start();
+        timeUnit.timedJoin(t, timeout);
+        if (t.isAlive()) {
+            throw new Exception("Cluster did not become " + desiredState + " within " + timeout + " "
+                    + timeUnit.name().toLowerCase());
+        }
+        LOGGER.info("Cluster state now " + desiredState);
+    }
+
 }
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
index 6a619d3..9e51f8c 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
@@ -22,6 +22,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import org.apache.asterix.test.common.TestExecutor;
@@ -30,8 +32,11 @@
 import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.server.process.HyracksCCProcess;
+import org.apache.hyracks.server.process.HyracksNCServiceProcess;
 import org.apache.hyracks.server.process.HyracksVirtualCluster;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
@@ -92,11 +97,27 @@
 
     private static final Logger LOGGER = Logger.getLogger(NCServiceExecutionIT.class.getName());
 
+    enum KillCommand {
+        CC,
+        NC1,
+        NC2;
+
+        @Override
+        public String toString() {
+            return "<kill " + name().toLowerCase() + ">";
+        }
+    }
+
+    private static HyracksCCProcess cc;
+    private static HyracksNCServiceProcess nc1;
+    private static HyracksNCServiceProcess nc2;
+
     private final TestCaseContext tcCtx;
     private static final TestExecutor testExecutor = new TestExecutor();
 
     private static final List<String> badTestCases = new ArrayList<>();
     private static HyracksVirtualCluster cluster;
+    private final KillCommand killType;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -114,28 +135,20 @@
         HDFSCluster.getInstance().setup(ASTERIX_APP_DIR + File.separator);
 
         cluster = new HyracksVirtualCluster(new File(APP_HOME), new File(ASTERIX_APP_DIR));
-        cluster.addNCService(
+        nc1 = cluster.addNCService(
                 new File(CONF_DIR, "ncservice1.conf"),
                 new File(LOG_DIR, "ncservice1.log"));
-        cluster.addNCService(
+
+        nc2 = cluster.addNCService(
                 new File(CONF_DIR, "ncservice2.conf"),
                 new File(LOG_DIR, "ncservice2.log"));
 
-        try {
-            Thread.sleep(2000);
-        } catch (InterruptedException ignored) {
-        }
-
         // Start CC
-        cluster.start(
+        cc = cluster.start(
                 new File(CONF_DIR, "cc.conf"),
                 new File(LOG_DIR, "cc.log"));
 
-        LOGGER.info("Sleeping while cluster comes online...");
-        try {
-            Thread.sleep(6000);
-        } catch (InterruptedException ignored) {
-        }
+        testExecutor.waitForClusterActive(30, TimeUnit.SECONDS);
     }
 
     @AfterClass
@@ -157,16 +170,40 @@
 
     @Parameters(name = "NCServiceExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
-        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        Collection<Object[]> testArgs = new ArrayList<>();
+        Random random = getRandom();
         TestCaseContext.Builder b = new TestCaseContext.Builder();
         for (TestCaseContext ctx : b.build(new File(TESTS_DIR))) {
             if (!skip(ctx)) {
-                testArgs.add(new Object[] { ctx });
+                testArgs.add(new Object[] { ctx, ctx, null });
+            }
+            // let's kill something every 50 tests
+            if (testArgs.size() % 50 == 0) {
+                final KillCommand killCommand = KillCommand.values()[random.nextInt(KillCommand.values().length)];
+                testArgs.add(new Object[] { killCommand, null, killCommand});
             }
         }
         return testArgs;
     }
 
+    private static Random getRandom() {
+        Random random;
+        if (System.getProperty("random.seed") == null) {
+            random = new Random() {
+                @Override
+                public synchronized void setSeed(long seed) {
+                    super.setSeed(seed);
+                    System.err.println("using generated seed: " + seed + "; use -Drandom.seed to use specific seed");
+                }
+            };
+        } else {
+            final long seed = Long.getLong("random.seed");
+            System.err.println("using provided seed (-Drandom.seed): " + seed);
+            random = new Random(seed);
+        }
+        return random;
+    }
+
     private static boolean skip(TestCaseContext tcCtx) {
         // For now we skip feeds tests, external-library, and api tests.
         for (TestGroup group : tcCtx.getTestGroups()) {
@@ -180,13 +217,42 @@
         return false;
     }
 
-    public NCServiceExecutionIT(TestCaseContext ctx) {
-        this.tcCtx = ctx;
+    public NCServiceExecutionIT(Object description, TestCaseContext tcCtx, KillCommand killType) {
+        this.tcCtx = tcCtx;
+        this.killType = killType;
     }
 
     @Test
     public void test() throws Exception {
-        testExecutor.executeTest(ACTUAL_RESULTS_DIR, tcCtx, null, false);
-        testExecutor.cleanup(tcCtx.toString(), badTestCases);
+        if (tcCtx != null) {
+            testExecutor.executeTest(ACTUAL_RESULTS_DIR, tcCtx, null, false);
+            testExecutor.cleanup(tcCtx.toString(), badTestCases);
+        } else {
+            switch (killType) {
+                case CC:
+                    LOGGER.info("Killing CC...");
+                    cc.stop(true);
+                    cc.start();
+                    break;
+
+                case NC1:
+                    LOGGER.info("Killing NC1...");
+                    nc1.stop(); // we can't kill due to ASTERIXDB-1941
+                    testExecutor.waitForClusterState("UNUSABLE", 60, TimeUnit.SECONDS); // wait for missed heartbeats...
+                    nc1.start(); // this restarts the NC service
+                    break;
+
+                case NC2:
+                    LOGGER.info("Killing NC2...");
+                    nc2.stop(); // we can't kill due to ASTERIXDB-1941
+                    testExecutor.waitForClusterState("UNUSABLE", 60, TimeUnit.SECONDS); // wait for missed heartbeats...
+                    nc2.start(); // this restarts the NC service
+                    break;
+
+                default:
+                    Assert.fail("killType: " + killType);
+            }
+            testExecutor.waitForClusterActive(30, TimeUnit.SECONDS);
+        }
     }
 }
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index 496d4cf..0beb38d 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -18,13 +18,17 @@
  */
 package org.apache.asterix.server.test;
 
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.test.base.TestMethodTracer;
@@ -33,7 +37,7 @@
 import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hyracks.util.file.FileUtil;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
@@ -48,13 +52,13 @@
     // Important paths and files for this test.
 
     // The "target" subdirectory of asterix-server. All outputs go here.
-    private static final String TARGET_DIR = FileUtil.joinPath("target");
+    private static final String TARGET_DIR = joinPath("target");
 
     // Directory where the NCs create and store all data, as configured by
     // src/test/resources/NCServiceExecutionIT/cc.conf.
-    private static final String OUTPUT_DIR = FileUtil.joinPath(TARGET_DIR, "sample local cluster");
+    private static final String OUTPUT_DIR = joinPath(TARGET_DIR, "sample local cluster");
 
-    private static final String LOCAL_SAMPLES_DIR = FileUtil.joinPath(OUTPUT_DIR, "opt", "local");
+    private static final String LOCAL_SAMPLES_DIR = joinPath(OUTPUT_DIR, "opt", "local");
 
     @Rule
     public TestRule watcher = new TestMethodTracer();
@@ -72,17 +76,41 @@
 
         String[] pathElements = new String[] { TARGET_DIR,
                 new File(TARGET_DIR).list((dir, name) -> name.matches("asterix-server.*-binary-assembly.zip"))[0] };
-        String installerZip = FileUtil.joinPath(pathElements);
+        String installerZip = joinPath(pathElements);
 
         TestHelper.unzip(installerZip, OUTPUT_DIR);
     }
 
+    private static List<File> findLogFiles(File directory, List<File> fileList) {
+        File [] match = directory.listFiles(pathname -> pathname.isDirectory() || pathname.toString().endsWith(".log"));
+        if (match != null) {
+            for (File file : match) {
+                if (file.isDirectory()) {
+                    findLogFiles(file, fileList);
+                } else {
+                    fileList.add(file);
+                }
+            }
+        }
+        return fileList;
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+
+        File destDir = new File(TARGET_DIR, joinPath("failsafe-reports", SampleLocalClusterIT.class.getSimpleName()));
+
+        for (File f : findLogFiles(new File(OUTPUT_DIR), new ArrayList<>())) {
+            FileUtils.copyFileToDirectory(f, destDir);
+        }
+    }
+
     @Test
     public void test0_startCluster() throws Exception {
-        Process process = new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
+        Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
                 .inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
-        process = new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
+        process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
     }
 
@@ -100,7 +128,7 @@
     @Test
     public void test2_stopCluster() throws Exception {
         Process process =
-                new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh")).inheritIO().start();
+                new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh")).inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
         try {
             new URL("http://127.0.0.1:19002").openConnection().connect();
diff --git a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
index 72d28b9..2a1c652 100644
--- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
+++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
@@ -39,3 +39,7 @@
 [cc]
 address = 127.0.0.1
 app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+
+[common]
+log.level = INFO
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 02416c4..64f4e29 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -33,4 +33,5 @@
      */
     IFileDeviceResolver getFileDeviceResolver();
 
+    void onRegisterNode() throws Exception;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index bd69f9e..dd63786 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,10 +18,16 @@
  */
 package org.apache.hyracks.api.job;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 public class JobIdFactory {
-    private long id = 0;
+    private final AtomicLong id = new AtomicLong(0);
 
     public JobId create() {
-        return new JobId(id++);
+        return new JobId(id.getAndIncrement());
+    }
+
+    public void ensureMinimumId(long id) {
+        this.id.updateAndGet(current -> Math.max(current, id));
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index ced3d67..b3e65ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -54,9 +54,9 @@
     private final ClusterControllerService ccs;
     private final JobIdFactory jobIdFactory;
 
-    ClientInterfaceIPCI(ClusterControllerService ccs) {
+    ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) {
         this.ccs = ccs;
-        jobIdFactory = new JobIdFactory();
+        this.jobIdFactory = jobIdFactory;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index bec0bfb..627f406 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,6 +25,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -40,13 +41,16 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -126,12 +130,14 @@
 
     private final IResourceManager resourceManager = new ResourceManager();
 
+    private final ICCApplication application;
+
+    private final JobIdFactory jobIdFactory;
+
     private IJobManager jobManager;
 
     private ShutdownRun shutdownCallback;
 
-    private final ICCApplication application;
-
     public ClusterControllerService(final CCConfig config) throws Exception {
         this(config, getApplication(config));
     }
@@ -162,6 +168,8 @@
 
         // Node manager is in charge of cluster membership management.
         nodeManager = new NodeManager(ccConfig, resourceManager);
+
+        jobIdFactory = new JobIdFactory();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -184,7 +192,7 @@
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
-        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
+        IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
         clientIPC = new IPCSystem(
                 new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
                 ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
@@ -228,13 +236,18 @@
         }
     }
 
+    private Pair<String, Integer> getNCService(String nodeId) {
+        IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId);
+        return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+                ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT));
+    }
+
     private Map<String, Pair<String, Integer>> getNCServices() {
         Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
         for (String ncId : configManager.getNodeNames()) {
-            IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
-            if (ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT) != NCConfig.NCSERVICE_PORT_DISABLED) {
-                ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
-                        ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
+            Pair<String, Integer> ncService = getNCService(ncId);
+            if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+                ncMap.put(ncId, ncService);
             }
         }
         return ncMap;
@@ -246,6 +259,23 @@
                     ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
             workQueue.schedule(triggerWork);
         });
+        serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
+            @Override
+            public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
+                // no-op, we don't care
+            }
+
+            @Override
+            public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
+                for (String nodeId : deadNodeIds) {
+                    Pair<String, Integer> ncService = getNCService(nodeId);
+
+                    final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+                            ncService.getLeft(), ncService.getRight(), nodeId);
+                    workQueue.schedule(triggerWork);
+                }
+            }
+        });
     }
 
     private void terminateNCServices() throws Exception {
@@ -353,6 +383,10 @@
         return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
     }
 
+    public JobIdFactory getJobIdFactory() {
+        return jobIdFactory;
+    }
+
     private final class ClusterControllerContext implements ICCContext {
         private final ClusterTopology topology;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc7bad0..f44fca0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -69,6 +69,7 @@
             params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
             params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
+            ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
         } catch (Exception e) {
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 490b6ff..89d6e78 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -72,11 +72,14 @@
 
     private final NodeCapacity capacity;
 
+    private final long maxJobId;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
                             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors,
                             String vmName, String vmVersion, String vmVendor, String classpath, String libraryPath,
                             String bootClasspath, List<String> inputArguments, Map<String, String> systemProperties,
-            HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity, int pid) {
+                            HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity, int pid,
+                            long maxJobId) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -98,6 +101,7 @@
         this.messagingPort = messagingPort;
         this.capacity = capacity;
         this.pid = pid;
+        this.maxJobId = maxJobId;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -181,4 +185,8 @@
     }
 
     public int getPid() { return pid; }
+
+    public long getMaxJobId() {
+        return maxJobId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 98d258f..31cb855 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -44,8 +44,9 @@
 
     private final int clusterConnectRetries;
 
-    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries) {
-        super(ipc, inetSocketAddress);
+    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries,
+                                        IControllerRemoteProxyIPCEventListener eventListener) {
+        super(ipc, inetSocketAddress, eventListener);
         this.clusterConnectRetries = clusterConnectRetries;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 44b0e4a..2815ae1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -28,11 +28,18 @@
 public abstract class ControllerRemoteProxy {
     protected final IPCSystem ipc;
     protected final InetSocketAddress inetSocketAddress;
+    private final IControllerRemoteProxyIPCEventListener eventListener;
     private IIPCHandle ipcHandle;
 
     protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+        this(ipc, inetSocketAddress, null);
+    }
+
+    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
+                                    IControllerRemoteProxyIPCEventListener eventListener) {
         this.ipc = ipc;
         this.inetSocketAddress = inetSocketAddress;
+        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener;
     }
 
     protected IIPCHandle ensureIpcHandle() throws IPCException {
@@ -40,10 +47,16 @@
         if (first || !ipcHandle.isConnected()) {
             if (!first) {
                 getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+                eventListener.ipcHandleDisconnected(ipcHandle);
             }
             ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
-            if (!first && ipcHandle.isConnected()) {
-                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+            if (ipcHandle.isConnected()) {
+                if (first) {
+                    eventListener.ipcHandleConnected(ipcHandle);
+                } else {
+                    getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                    eventListener.ipcHandleRestored(ipcHandle);
+                }
             }
         }
         return ipcHandle;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
new file mode 100644
index 0000000..ec4f9e4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.ipc;
+
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+public interface IControllerRemoteProxyIPCEventListener {
+
+    default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 20f6378..5afc98d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -53,6 +53,11 @@
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // no-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // no-op
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0587a55..fc911c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,6 +37,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -65,6 +66,7 @@
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.PidHelper;
 import org.apache.hyracks.control.common.work.FutureValue;
@@ -80,7 +82,9 @@
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
@@ -157,6 +161,10 @@
 
     private final ConfigManager configManager;
 
+    private NodeRegistration nodeRegistration;
+
+    private final AtomicLong maxJobId = new AtomicLong(-1);
+
     public NodeControllerService(NCConfig config) throws Exception {
         this(config, getApplication(config));
     }
@@ -189,7 +197,6 @@
         threadMXBean = ManagementFactory.getThreadMXBean();
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
-        registrationPending = true;
         getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
         ioCounter = new IOCounterFactory().getIOCounter();
@@ -280,34 +287,18 @@
         }
         this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
-                ncConfig.getClusterConnectRetries());
-        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
-        for (int i = 0; i < gcInfos.length; ++i) {
-            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
-        }
-        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        // Use "public" versions of network addresses and ports
-        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
-        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
-                : null;
-        int allCores = osMXBean.getAvailableProcessors();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
-                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
-                application.getCapacity(), PidHelper.getPid()));
-
-        synchronized (this) {
-            while (registrationPending) {
-                wait();
+                ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() {
+            @Override
+            public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+                // we need to re-register in case the NC -> CC connection reset was due to CC shutdown
+                try {
+                    registerNode();
+                } catch (Exception e) {
+                    throw new IPCException(e);
+                }
             }
-        }
-        if (registrationException != null) {
-            throw registrationException;
-        }
-        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
+        });
+        registerNode();
 
         workQueue.start();
 
@@ -330,12 +321,54 @@
         application.startupCompleted();
     }
 
+    public void registerNode() throws Exception {
+        LOGGER.info("Registering with Cluster Controller");
+        registrationPending = true;
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+        for (int i = 0; i < gcInfos.length; ++i) {
+            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+        }
+        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+        // Use "public" versions of network addresses and ports
+        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
+                : null;
+        int allCores = osMXBean.getAvailableProcessors();
+        nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
+                application.getCapacity(), PidHelper.getPid(), maxJobId.get());
+
+        ccs.registerNode(nodeRegistration);
+
+        synchronized (this) {
+            while (registrationPending) {
+                wait();
+            }
+        }
+        if (registrationException != null) {
+            LOGGER.log(Level.WARNING, "Registering with Cluster Controller failed with exception",
+                    registrationException);
+            throw registrationException;
+        }
+        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
+        application.onRegisterNode();
+        LOGGER.info("Registering with Cluster Controller complete");
+    }
+
     private void startApplication() throws Exception {
         serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
         application.start(serviceCtx, ncConfig.getAppArgsArray());
         executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
     }
 
+    public void updateMaxJobId(JobId jobId) {
+        maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+    }
+
     @Override
     public synchronized void stop() throws Exception {
         if (!shuttedDown) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
index 807142b..486a420 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -46,6 +46,7 @@
     public void run() {
         try {
             ncs.checkForDuplicateDistributedJob(jobId);
+            ncs.updateMaxJobId(jobId);
             ActivityClusterGraph acg =
                     (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
             ncs.storeActivityClusterGraph(jobId, acg);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 95f3e83..b55cd4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -103,6 +103,7 @@
     public void run() {
         Task task = null;
         try {
+            ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
             Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, serviceCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 00af38b..9da3502 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.nc.service;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -30,6 +31,7 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
@@ -170,8 +172,11 @@
                     // If the directory IS there, all is well
                 }
                 File logfile = new File(config.logdir, "nc-" + ncId + ".log");
-                // Don't care if this succeeds or fails:
-                logfile.delete();
+                try (FileWriter writer = new FileWriter(logfile, true)) {
+                    writer.write("---------------------\n");
+                    writer.write(new Date() + "\n");
+                    writer.write("---------------------\n");
+                }
                 pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logfile));
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Logging to " + logfile.getCanonicalPath());
@@ -254,6 +259,11 @@
             public void run() {
                 if (proc != null) {
                     proc.destroy();
+                    try {
+                        proc.waitFor();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
                 }
             }
         });
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index b7aa342..2967039 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -40,6 +40,11 @@
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // No-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // No-op
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 1fcdb3c..efd9830 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -192,4 +192,9 @@
     boolean full() {
         return full;
     }
+
+    @Override
+    public String toString() {
+        return "IPCHandle [addr=" + remoteAddress + " state=" + state + "]";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
index 13cb445..7bb3332 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
@@ -19,10 +19,8 @@
 package org.apache.hyracks.server.process;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -51,7 +49,9 @@
                 LOGGER.info("Logging to: " + logFile.getCanonicalPath());
             }
             logFile.getParentFile().mkdirs();
-            logFile.delete();
+            try (FileWriter writer = new FileWriter(logFile, true)) {
+                writer.write("---------------------\n");
+            }
             pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -71,6 +71,19 @@
         }
     }
 
+    public void stop(boolean forcibly) {
+        if (forcibly) {
+            process.destroyForcibly();
+        } else {
+            process.destroy();
+        }
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private String[] buildCommand() {
         List<String> cList = new ArrayList<String>();
         cList.add(getJavaCommand());
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
index cc0ecf7..e1590e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
@@ -51,10 +51,11 @@
      * @param configFile - full path to an ncservice.conf. May be null to accept all defaults.
      * @throws IOException - if there are errors starting the process.
      */
-    public void addNCService(File configFile, File logFile) throws IOException {
+    public HyracksNCServiceProcess addNCService(File configFile, File logFile) throws IOException {
         HyracksNCServiceProcess proc = new HyracksNCServiceProcess(configFile, logFile, appHome, workingDir);
         proc.start();
         ncProcs.add(proc);
+        return proc;
     }
 
     /**
@@ -64,9 +65,10 @@
      *                     defaults, although this is seldom useful since there are no NCs.
      * @throws IOException - if there are errors starting the process.
      */
-    public void start(File ccConfigFile, File logFile) throws IOException {
+    public HyracksCCProcess start(File ccConfigFile, File logFile) throws IOException {
         ccProc = new HyracksCCProcess(ccConfigFile, logFile, appHome, workingDir);
         ccProc.start();
+        return ccProc;
     }
 
     /**