[NO ISSUE][HYR] Notify CC of NC shutdown only after shutdown is complete
- close NC IPC manager after sending shutdown notifcation to CC
Change-Id: Idde1f69a0e0a9a948898d9271441ca95485b77f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2159
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
similarity index 75%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
index 4c65c66..c7ac0f4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
@@ -18,7 +18,16 @@
*/
package org.apache.asterix.common.utils;
-public class InterruptUtil {
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class InvokeUtil {
+
+ private static final Logger LOGGER = Logger.getLogger(InvokeUtil.class.getName());
+
/**
* Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
@@ -82,7 +91,7 @@
}
/**
- * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an
+ * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an
* exception after being previously interrupted, the current thread will be re-interrupted.
*
* @return true if the original operation was interrupted, otherwise false
@@ -106,6 +115,33 @@
return interrupted;
}
+ public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit,
+ Callable<Boolean> function) throws IOException {
+ long endTime = System.nanoTime() + durationUnit.toNanos(duration);
+ boolean first = true;
+ while (endTime - System.nanoTime() > 0) {
+ if (first) {
+ first = false;
+ } else {
+ try {
+ delayUnit.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ try {
+ if (function.call()) {
+ return true;
+ }
+ } catch (Exception e) {
+ // ignore, retry after delay
+ LOGGER.log(Level.FINE, "Ignoring exception on retryLoop attempt, will retry after delay", e);
+ }
+ }
+ return false;
+ }
+
@FunctionalInterface
public interface Interruptible {
void run() throws InterruptedException;
@@ -115,4 +151,5 @@
public interface ThrowingInterruptible {
void run() throws Exception; // NOSONAR
}
+
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 4d671f3..dd0a5c7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -51,7 +51,7 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.MutableLong;
import org.apache.asterix.common.transactions.TxnLogFile;
-import org.apache.asterix.common.utils.InterruptUtil;
+import org.apache.asterix.common.utils.InvokeUtil;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
public class LogManager implements ILogManager, ILifeCycleComponent {
@@ -655,7 +655,7 @@
public void terminate() {
// make sure the LogFlusher thread started before terminating it.
- InterruptUtil.doUninterruptibly(started::acquire);
+ InvokeUtil.doUninterruptibly(started::acquire);
stopping = true;
@@ -665,7 +665,7 @@
currentFlushPage.stop();
}
// finally we put a POISON_PILL onto the flushQ to indicate to the flusher it is time to exit
- InterruptUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL));
+ InvokeUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL));
}
@Override
@@ -675,7 +675,7 @@
try {
while (true) {
flushPage = null;
- interrupted = InterruptUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted;
+ interrupted = InvokeUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted;
if (flushPage == POISON_PILL) {
return true;
}
diff --git a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java
index de9ec90..675d4ab 100644
--- a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java
+++ b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java
@@ -28,6 +28,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
@@ -35,7 +37,17 @@
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
-import org.apache.commons.httpclient.*;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.utils.InvokeUtil;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.Node;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.NameValuePair;
+import org.apache.commons.httpclient.NoHttpResponseException;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
@@ -49,10 +61,6 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.asterix.common.configuration.AsterixConfiguration;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-
public class Utils {
private Utils() {
@@ -105,11 +113,15 @@
//do nothing... this is expected
}
//now let's test that the instance is really down, or throw an exception
- try {
- executeHTTPCall(method);
- } catch (ConnectException e) {
- return;
- }
+ InvokeUtil.retryLoop(1, TimeUnit.MINUTES, 500, TimeUnit.MILLISECONDS, () -> {
+ try {
+ executeHTTPCall(method);
+ } catch (ConnectException e) {
+ //do nothing... this is expected
+ return true;
+ }
+ return false;
+ });
throw new IOException("Instance did not shut down cleanly.");
}
@@ -142,7 +154,7 @@
if (result == null) {
return false;
}
- if(method.getStatusCode() != HttpStatus.SC_OK){
+ if (method.getStatusCode() != HttpStatus.SC_OK) {
return false;
}
return true;
@@ -237,7 +249,7 @@
* @throws IOException
*/
public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException {
- List<String> backups = getBackups(conf,confDirRel,instance);
+ List<String> backups = getBackups(conf, confDirRel, instance);
if (backups.size() != 0) {
System.out.println("Backups for instance " + instance + ": ");
for (String name : backups) {
@@ -247,20 +259,22 @@
System.out.println("No backups found for instance " + instance + ".");
}
}
- /**
- * Return the available snapshot names
- * @param conf
- * @param confDirRel
- * @param instance
- * @return
- * @throws IOException
- */
- public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{
+
+ /**
+ * Return the available snapshot names
+ *
+ * @param conf
+ * @param confDirRel
+ * @param instance
+ * @return
+ * @throws IOException
+ */
+ public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
FileStatus[] backups = fs.listStatus(backupFolder);
List<String> backupNames = new ArrayList<String>();
- for(FileStatus f: backups){
+ for (FileStatus f : backups) {
backupNames.add(f.getPath().getName());
}
return backupNames;
@@ -441,8 +455,8 @@
return waitForLiveness(appId, false, true, message, yarnClient, "", null, port);
}
- public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException,
- IOException, JAXBException {
+ public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port)
+ throws YarnException, IOException, JAXBException {
return waitForLiveness(appId, false, false, "", yarnClient, "", null, port);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
index 5119022..83cbb91 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java
@@ -27,9 +27,9 @@
public class NotifyShutdownWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName());
private final ClusterControllerService ccs;
private final String nodeId;
- private static Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName());
public NotifyShutdownWork(ClusterControllerService ccs, String nodeId) {
this.ccs = ccs;
@@ -41,8 +41,12 @@
public void doRun() {
// Triggered remotely by a NC to notify that the NC is shutting down.
ShutdownRun sRun = ccs.getShutdownRun();
- LOGGER.info("Received shutdown acknowledgement from NC ID:" + nodeId);
- sRun.notifyShutdown(nodeId);
+ if (sRun != null) {
+ LOGGER.info("Received shutdown acknowledgement from node " + nodeId);
+ sRun.notifyShutdown(nodeId);
+ } else {
+ LOGGER.info("Received unsolicted shutdown notification from node " + nodeId);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c54f153..b220039 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -122,7 +122,7 @@
case SHUTDOWN_REQUEST:
final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
- ncs.getExecutor().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
+ ncs.getExecutor().submit(new ShutdownTask(sdrf.isTerminateNCService()));
return;
case THREAD_DUMP_REQUEST:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 9dd9536..a3a9ac5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -409,6 +409,13 @@
heartbeatThread.interrupt();
heartbeatThread.join(1000); // give it 1s to stop gracefully
}
+ try {
+ ccs.notifyShutdown(id);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception notifying CC of shutdown", e);
+ }
+ ipc.stop();
+
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
} else {
LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
index e9cf3cb..4dd57f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java
@@ -19,34 +19,17 @@
package org.apache.hyracks.control.nc.task;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.control.common.base.IClusterController;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.util.ExitUtil;
public class ShutdownTask implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(ShutdownTask.class.getName());
- private final NodeControllerService ncs;
private final boolean terminateNCService;
- public ShutdownTask(NodeControllerService ncs, boolean terminateNCService) {
- this.ncs = ncs;
+ public ShutdownTask(boolean terminateNCService) {
this.terminateNCService = terminateNCService;
}
@Override
- @SuppressWarnings("squid:S1147") // Runtime.exit()
public void run() {
- IClusterController ccs = ncs.getClusterController();
- try {
- ccs.notifyShutdown(ncs.getId());
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception notifying CC of shutdown acknowledgment", e);
- // proceed with shutdown
- }
-
ExitUtil.exit(terminateNCService ? 99 : 0);
}