[NO ISSUE][RT] Make start and cancel job uninterruptible
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Previously, a Hyracks data connection that starts
a job might be interrupted before it receives the
job id. This would leak resources since that job
will run even though, no one is going to read its
result.
- Similarly, job cancellation can be interrupted and
so the job which was meant to be cancelled will
continue running.
- To avoid this, a new thread is added to Hyracks
Client Connection which takes care of starting
and cancelling of jobs. The thread submitting these
requests will be un-interruptible until those calls
return.
Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2639
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index edac0fa..f22693a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -33,8 +33,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -53,19 +51,14 @@
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
- /** a map from NC name to the NodeControllerInfo */
- private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
- public IndexingScheduler(String ipAddress, int port) throws HyracksException {
+ public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- this.ncNameToNcInfos = hcc.getNodeControllerInfos();
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw HyracksException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index bd50352..1b13ec5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -67,11 +68,11 @@
public static IndexingScheduler initializeIndexingHDFSScheduler(ICCServiceContext serviceCtx)
throws HyracksDataException {
- ICCContext ccContext = serviceCtx.getCCContext();
IndexingScheduler scheduler = null;
try {
- scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ scheduler = new IndexingScheduler(hcc.getNodeControllerInfos());
} catch (HyracksException e) {
throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index f635d94..cfa6f78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,11 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -44,10 +49,16 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.ipc.api.RPCInterface;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -56,6 +67,9 @@
* @author vinayakb
*/
public final class HyracksConnection implements IHyracksClientConnection {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
private final String ccHost;
private final int ccPort;
@@ -66,6 +80,15 @@
private final ClusterControllerInfo ccInfo;
+ private volatile boolean running = false;
+
+ private volatile long reqId = 0L;
+
+ private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2,
+ r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName()));
+
+ private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1);
+
/**
* Constructor to create a connection to the Hyracks Cluster Controller.
*
@@ -86,6 +109,8 @@
hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)),
rpci);
ccInfo = hci.getClusterControllerInfo();
+ uninterruptibleExecutor.execute(new UninterrubtileRequestHandler());
+ uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher());
}
@Override
@@ -95,7 +120,8 @@
@Override
public void cancelJob(JobId jobId) throws Exception {
- hci.cancelJob(jobId);
+ CancelJobRequest request = new CancelJobRequest(jobId);
+ uninterruptiblySubmitAndExecute(request);
}
@Override
@@ -131,12 +157,13 @@
@Override
public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
- return hci.startJob(deployedJobSpecId, jobParameters);
+ StartDeployedJobRequest request = new StartDeployedJobRequest(deployedJobSpecId, jobParameters);
+ return interruptiblySubmitAndExecute(request);
}
@Override
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+ return startJob(null, acggf, jobFlags);
}
public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
@@ -154,7 +181,7 @@
hci.waitForCompletion(jobId);
} catch (InterruptedException e) {
// Cancels an on-going job if the current thread gets interrupted.
- hci.cancelJob(jobId);
+ cancelJob(jobId);
throw e;
}
}
@@ -232,7 +259,8 @@
@Override
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
+ StartJobRequest request = new StartJobRequest(deploymentId, acggf, jobFlags);
+ return interruptiblySubmitAndExecute(request);
}
@Override
@@ -269,4 +297,162 @@
public boolean isConnected() {
return hci.isConnected();
}
+
+ private <T> T uninterruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception {
+ InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request));
+ return uninterruptiblyExecute(request);
+ }
+
+ private <T> T uninterruptiblyExecute(UnInterruptibleRequest<T> request) throws Exception {
+ InvokeUtil.doUninterruptibly(request);
+ return request.result();
+ }
+
+ private <T> T interruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception {
+ uninterruptibles.put(request);
+ return uninterruptiblyExecute(request);
+ }
+
+ private abstract class UnInterruptibleRequest<T> implements InterruptibleAction {
+ boolean completed = false;
+ boolean failed = false;
+ Throwable failure = null;
+ T response = null;
+
+ @SuppressWarnings("squid:S1181")
+ private final void handle() {
+ try {
+ response = doHandle();
+ } catch (Throwable th) {
+ failed = true;
+ failure = th;
+ } finally {
+ synchronized (this) {
+ completed = true;
+ notifyAll();
+ }
+ }
+ }
+
+ protected abstract T doHandle() throws Exception;
+
+ @Override
+ public final synchronized void run() throws InterruptedException {
+ while (!completed) {
+ wait();
+ }
+ }
+
+ public T result() throws Exception {
+ if (failed) {
+ if (failure instanceof Error) {
+ throw (Error) failure;
+ }
+ throw (Exception) failure;
+ }
+ return response;
+ }
+ }
+
+ private class CancelJobRequest extends UnInterruptibleRequest<Void> {
+ final JobId jobId;
+
+ public CancelJobRequest(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected Void doHandle() throws Exception {
+ hci.cancelJob(jobId);
+ return null;
+ }
+
+ }
+
+ private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> {
+
+ private final DeployedJobSpecId deployedJobSpecId;
+ private final Map<byte[], byte[]> jobParameters;
+
+ public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.jobParameters = jobParameters;
+ }
+
+ @Override
+ protected JobId doHandle() throws Exception {
+ return hci.startJob(deployedJobSpecId, jobParameters);
+ }
+
+ }
+
+ private class StartJobRequest extends UnInterruptibleRequest<JobId> {
+ private final DeploymentId deploymentId;
+ private final IActivityClusterGraphGeneratorFactory acggf;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public StartJobRequest(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+ EnumSet<JobFlag> jobFlags) {
+ this.deploymentId = deploymentId;
+ this.acggf = acggf;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ protected JobId doHandle() throws Exception {
+ if (deploymentId == null) {
+ return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+ } else {
+ return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
+ }
+ }
+
+ }
+
+ private class UninterrubtileRequestHandler implements Runnable {
+ @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ UnInterruptibleRequest<?> next = uninterruptibles.take();
+ reqId++;
+ running = true;
+ next.handle();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ } finally {
+ running = false;
+ }
+ }
+ }
+ }
+
+ public class UninterrubtileHandlerWatcher implements Runnable {
+ @Override
+ @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+ public void run() {
+ long currentReqId = 0L;
+ long currentTime = System.nanoTime();
+ while (true) {
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ }
+ if (running) {
+ if (reqId == currentReqId) {
+ if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+ }
+ } else {
+ currentReqId = reqId;
+ currentTime = System.nanoTime();
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 4aa123b..c8b9112 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -34,6 +34,7 @@
public static final int EC_FAILED_TO_STARTUP = 2;
public static final int EC_FAILED_TO_RECOVER = 3;
public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
+ public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5;
public static final int EC_UNHANDLED_EXCEPTION = 11;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;