[ASTERIXDB-2198][CLUS] Introduce NodeJobTracker
- user model changes: no
- storage format changes: no
- interface changes: yes
- Add INodeJobTracker to ICcApplicationContext.
Details:
- Add NodeJobTracker to track each node pending jobs.
- Ensure IJobLifecycleListeners are notified about job
creation as soon as the job is received by the CC.
- Add unit test for NodeJobTracker.
Change-Id: Ie5638a6382b0ae0509a2aeeb80dee3db8e7657bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2220
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 4e75bf7..8283257 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -59,6 +59,7 @@
import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.ExternalProperties;
@@ -163,6 +164,9 @@
webManager.start();
ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager);
ccServiceCtx.addClusterLifecycleListener(new ClusterLifecycleListener(appCtx));
+ final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
+ ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
+ ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
new file mode 100644
index 0000000..4503620
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+
+public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycleListener {
+
+ /**
+ * Gets node {@code nodeId} pending jobs. If the node is not active,
+ * an empty set is returned.
+ *
+ * @param nodeId
+ * @return unmodifiable set of the node pending jobs.
+ */
+ Set<JobId> getPendingJobs(String nodeId);
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3f4d6a6..6181ade 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
@@ -115,4 +116,9 @@
* @return the extension properties
*/
ExtensionProperties getExtensionProperties();
+
+ /**
+ * @return the node job tracker
+ */
+ INodeJobTracker getNodeJobTracker();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
new file mode 100644
index 0000000..75c5582
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.job.listener;
+
+import static org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class NodeJobTracker implements INodeJobTracker {
+
+ private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
+
+ @Override
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ final Set<String> matchedNodes = spec.getUserConstraints().stream().map(Constraint::getRValue)
+ .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+ .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+ .collect(Collectors.toSet());
+ matchedNodes.stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+ }
+
+ @Override
+ public synchronized void notifyJobStart(JobId jobId) {
+ // nothing to do
+ }
+
+ @Override
+ public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+ nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
+ }
+
+ @Override
+ public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) {
+ nodeJobs.computeIfAbsent(nodeId, key -> new HashSet<>());
+ }
+
+ @Override
+ public synchronized void notifyNodeFailure(Collection<String> deadNodeIds) {
+ deadNodeIds.forEach(nodeJobs::remove);
+ }
+
+ @Override
+ public synchronized Set<JobId> getPendingJobs(String nodeId) {
+ return nodeJobs.containsKey(nodeId) ?
+ Collections.unmodifiableSet(nodeJobs.get(nodeId)) :
+ Collections.emptySet();
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 63d3d6f..c3ef5b3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -22,6 +22,7 @@
import java.util.function.Supplier;
import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ActiveProperties;
@@ -42,6 +43,7 @@
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -81,6 +83,7 @@
private IJobLifecycleListener activeLifeCycleListener;
private IMetadataLockManager mdLockManager;
private IClusterStateManager clusterStateManager;
+ private final INodeJobTracker nodeJobTracker;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
@@ -114,6 +117,7 @@
clusterStateManager = new ClusterStateManager();
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
+ nodeJobTracker = new NodeJobTracker();
}
@Override
@@ -251,4 +255,9 @@
public IClusterStateManager getClusterStateManager() {
return clusterStateManager;
}
+
+ @Override
+ public INodeJobTracker getNodeJobTracker() {
+ return nodeJobTracker;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
new file mode 100644
index 0000000..186dba8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.job.listener;
+
+import java.util.Collections;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class NodeJobTrackerTest {
+
+ @Test
+ public void hasPendingJobsTest() {
+ final String nc1 = "nc1";
+ final String nc2 = "nc2";
+ final String unknown = "unknown";
+ final NodeJobTracker nodeJobTracker = new NodeJobTracker();
+ nodeJobTracker.notifyNodeJoin(nc1, null);
+ nodeJobTracker.notifyNodeJoin(nc2, null);
+
+ JobSpecification jobSpec = new JobSpecification();
+ // add nc1 and some other unknown location
+ final ConstantExpression nc1Location = new ConstantExpression(nc1);
+ final ConstantExpression unknownLocation = new ConstantExpression(unknown);
+ final LValueConstraintExpression lValueMock = Mockito.mock(LValueConstraintExpression.class);
+ jobSpec.getUserConstraints().add(new Constraint(lValueMock, nc1Location));
+ jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
+
+ JobId jobId = new JobId(1);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ // make sure nc1 has a pending job
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
+ nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+ // make sure nc1 doesn't have pending jobs anymore
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
+
+ // make sure node doesn't have pending jobs after failure
+ jobId = new JobId(2);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
+ nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
+ Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 26f80228..7f1100b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -106,6 +106,8 @@
checkJob(jobRun);
JobSpecification job = jobRun.getJobSpecification();
IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+ CCServiceContext serviceCtx = ccs.getContext();
+ serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
switch (status) {
case QUEUE:
queueJob(jobRun);
@@ -304,10 +306,6 @@
run.setStartTime(System.currentTimeMillis());
JobId jobId = run.getJobId();
activeRunMap.put(jobId, run);
-
- CCServiceContext serviceCtx = ccs.getContext();
- JobSpecification spec = run.getJobSpecification();
- serviceCtx.notifyJobCreation(jobId, spec);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
}