Merge commit 'c2d19a558508bc6397ca044430546ce5be4e0872' from release-0.9.4-pre-rc
Change-Id: Ic58b735c4bca6891ae35fe301bb4bc44889df0d4
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index b581321..57474ef 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -29,11 +29,11 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -71,13 +71,13 @@
final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
long replicationTimeOut = TimeUnit.SECONDS.toMillis(appCtx.getReplicationProperties().getReplicationTimeOut());
- final long startTime = System.nanoTime();
synchronized (indexCheckpointManager) {
// wait until the lsn mapping is flushed to disk
while (!indexCheckpointManager.isFlushed(masterLsn)) {
if (replicationTimeOut <= 0) {
throw new ReplicationException(new TimeoutException("Couldn't receive flush lsn from master"));
}
+ final long startTime = System.nanoTime();
indexCheckpointManager.wait(replicationTimeOut);
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 2235752..bf1f9dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -40,6 +40,17 @@
return new HyracksException(cause);
}
+ public static HyracksException wrapOrThrowUnchecked(Throwable cause) {
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof HyracksException) {
+ return (HyracksException) cause;
+ }
+ return new HyracksException(cause);
+ }
+
public static HyracksException create(int code, Serializable... params) {
return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index fb2bdeb..b2dd680 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -242,14 +242,13 @@
*/
public static void runWithTimeout(ThrowingAction action, BooleanSupplier stopCondition, long timeout, TimeUnit unit)
throws Exception {
- long remainingTime = unit.toNanos(timeout);
+ final long waitTime = unit.toNanos(timeout);
final long startTime = System.nanoTime();
while (!stopCondition.getAsBoolean()) {
- if (remainingTime <= 0) {
+ action.run();
+ if (System.nanoTime() - startTime >= waitTime) {
throw new TimeoutException("Stop condition was not met after " + unit.toSeconds(timeout) + " seconds.");
}
- action.run();
- remainingTime -= System.nanoTime() - startTime;
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9fd1a02..3ba25f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -31,12 +31,14 @@
import java.util.Set;
import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.application.CCServiceContext;
@@ -65,8 +67,7 @@
private final IJobCapacityController jobCapacityController;
private IJobQueue jobQueue;
- public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
- throws HyracksException {
+ public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) {
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
try {
@@ -116,6 +117,8 @@
case EXECUTE:
executeJob(jobRun);
break;
+ default:
+ throw new IllegalStateException("unknown submission status: " + status);
}
}
@@ -136,9 +139,18 @@
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
// Since the job has not been executed, we only need to update its status and lifecyle here.
- jobRun.setStatus(JobStatus.FAILURE, exceptions);
+ jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
+ CCServiceContext serviceCtx = ccs.getContext();
+ if (serviceCtx != null) {
+ try {
+ serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ } catch (Exception e) {
+ LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
+ throw HyracksDataException.create(e);
+ }
+ }
}
callback.setValue(null);
}
@@ -152,14 +164,12 @@
finalComplete(run);
return;
}
- JobId jobId = run.getJobId();
- HyracksException caughtException = null;
if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
finalComplete(run);
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: " + jobId);
+ LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -168,38 +178,40 @@
run.setPendingStatus(status, exceptions);
}
- if (targetNodes != null && !targetNodes.isEmpty()) {
- INodeManager nodeManager = ccs.getNodeManager();
- Set<String> toDelete = new HashSet<>();
- for (String n : targetNodes) {
- NodeControllerState ncs = nodeManager.getNodeControllerState(n);
- try {
- if (ncs == null) {
- toDelete.add(n);
- } else {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
- }
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
- if (caughtException == null) {
- caughtException = HyracksException.create(e);
- } else {
- caughtException.addSuppressed(e);
- }
- }
- }
- targetNodes.removeAll(toDelete);
- run.getCleanupPendingNodeIds().removeAll(toDelete);
- if (run.getCleanupPendingNodeIds().isEmpty()) {
- finalComplete(run);
- }
+ if (!targetNodes.isEmpty()) {
+ cleanupJobOnNodes(run, status, targetNodes);
} else {
finalComplete(run);
}
+ }
+
+ private void cleanupJobOnNodes(JobRun run, JobStatus status, Set<String> targetNodes) throws HyracksException {
+ Throwable caughtException = null;
+ JobId jobId = run.getJobId();
+ INodeManager nodeManager = ccs.getNodeManager();
+ Set<String> toDelete = new HashSet<>();
+ for (String n : targetNodes) {
+ NodeControllerState ncs = nodeManager.getNodeControllerState(n);
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ try {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ } catch (Exception e) {
+ LOGGER.error("Exception cleaning up joblet {} on node {}", jobId, n, e);
+ caughtException = ExceptionUtils.suppress(caughtException, e);
+ }
+ }
+ }
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ finalComplete(run);
+ }
// throws caught exceptions if any
if (caughtException != null) {
- throw caughtException;
+ throw HyracksException.wrapOrThrowUnchecked(caughtException);
}
}
@@ -207,13 +219,13 @@
public void finalComplete(JobRun run) throws HyracksException {
checkJob(run);
JobId jobId = run.getJobId();
- HyracksException caughtException = null;
+ Throwable caughtException = null;
CCServiceContext serviceCtx = ccs.getContext();
if (serviceCtx != null) {
try {
serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
- } catch (HyracksException e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
+ } catch (Exception e) {
+ LOGGER.error("Exception notifying job finish {}", jobId, e);
caughtException = e;
}
}
@@ -224,18 +236,14 @@
runMapHistory.put(jobId, run.getExceptions());
if (run.getActivityClusterGraph().isReportTaskDetails()) {
- /**
+ /*
* log job details when profiling is enabled
*/
try {
ccs.getJobLogFile().log(createJobLogObject(run));
} catch (Exception e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
- if (caughtException == null) {
- caughtException = HyracksException.create(e);
- } else {
- caughtException.addSuppressed(e);
- }
+ LOGGER.error("Exception reporting task details for job {}", jobId, e);
+ caughtException = ExceptionUtils.suppress(caughtException, e);
}
}
@@ -248,7 +256,7 @@
// throws caught exceptions if any
if (caughtException != null) {
- throw caughtException;
+ throw HyracksException.wrapOrThrowUnchecked(caughtException);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index e4699c7..5b98260 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -209,17 +209,12 @@
}
public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
- Map<Integer, String> locations = operatorLocations.get(op);
- if (locations == null) {
- locations = new HashMap<Integer, String>();
- operatorLocations.put(op, locations);
- }
- locations.put(partition, location);
+ operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
}
@Override
public synchronized void waitForCompletion() throws Exception {
- while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
+ while (status == JobStatus.PENDING || status == JobStatus.RUNNING) {
wait();
}
if (exceptions != null && !exceptions.isEmpty()) {