[ASTERIXDB-2198][CLUS] Introduce NodeJobTracker

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add INodeJobTracker to ICcApplicationContext.

Details:
- Add NodeJobTracker to track each node pending jobs.
- Ensure IJobLifecycleListeners are notified about job
  creation as soon as the job is received by the CC.
- Add unit test for NodeJobTracker.

Change-Id: Ie5638a6382b0ae0509a2aeeb80dee3db8e7657bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2220
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 4e75bf7..8283257 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -59,6 +59,7 @@
 import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ExternalProperties;
@@ -163,6 +164,9 @@
         webManager.start();
         ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager);
         ccServiceCtx.addClusterLifecycleListener(new ClusterLifecycleListener(appCtx));
+        final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
+        ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
+        ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
 
         jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
new file mode 100644
index 0000000..4503620
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+
+public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycleListener {
+
+    /**
+     * Gets node {@code nodeId} pending jobs. If the node is not active,
+     * an empty set is returned.
+     *
+     * @param nodeId
+     * @return unmodifiable set of the node pending jobs.
+     */
+    Set<JobId> getPendingJobs(String nodeId);
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3f4d6a6..6181ade 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ExtensionProperties;
@@ -115,4 +116,9 @@
      * @return the extension properties
      */
     ExtensionProperties getExtensionProperties();
+
+    /**
+     * @return the node job tracker
+     */
+    INodeJobTracker getNodeJobTracker();
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
new file mode 100644
index 0000000..75c5582
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.job.listener;
+
+import static org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class NodeJobTracker implements INodeJobTracker {
+
+    private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
+
+    @Override
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+        final Set<String> matchedNodes = spec.getUserConstraints().stream().map(Constraint::getRValue)
+                .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+                .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+                .collect(Collectors.toSet());
+        matchedNodes.stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+    }
+
+    @Override
+    public synchronized void notifyJobStart(JobId jobId) {
+        // nothing to do
+    }
+
+    @Override
+    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+        nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
+    }
+
+    @Override
+    public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) {
+        nodeJobs.computeIfAbsent(nodeId, key -> new HashSet<>());
+    }
+
+    @Override
+    public synchronized void notifyNodeFailure(Collection<String> deadNodeIds) {
+        deadNodeIds.forEach(nodeJobs::remove);
+    }
+
+    @Override
+    public synchronized Set<JobId> getPendingJobs(String nodeId) {
+        return nodeJobs.containsKey(nodeId) ?
+                Collections.unmodifiableSet(nodeJobs.get(nodeId)) :
+                Collections.emptySet();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 63d3d6f..c3ef5b3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -22,6 +22,7 @@
 import java.util.function.Supplier;
 
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ActiveProperties;
@@ -42,6 +43,7 @@
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.runtime.job.listener.NodeJobTracker;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -81,6 +83,7 @@
     private IJobLifecycleListener activeLifeCycleListener;
     private IMetadataLockManager mdLockManager;
     private IClusterStateManager clusterStateManager;
+    private final INodeJobTracker nodeJobTracker;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
@@ -114,6 +117,7 @@
         clusterStateManager = new ClusterStateManager();
         clusterStateManager.setCcAppCtx(this);
         this.resourceIdManager = new ResourceIdManager(clusterStateManager);
+        nodeJobTracker = new NodeJobTracker();
     }
 
     @Override
@@ -251,4 +255,9 @@
     public IClusterStateManager getClusterStateManager() {
         return clusterStateManager;
     }
+
+    @Override
+    public INodeJobTracker getNodeJobTracker() {
+        return nodeJobTracker;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
new file mode 100644
index 0000000..186dba8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.job.listener;
+
+import java.util.Collections;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class NodeJobTrackerTest {
+
+    @Test
+    public void hasPendingJobsTest() {
+        final String nc1 = "nc1";
+        final String nc2 = "nc2";
+        final String unknown = "unknown";
+        final NodeJobTracker nodeJobTracker = new NodeJobTracker();
+        nodeJobTracker.notifyNodeJoin(nc1, null);
+        nodeJobTracker.notifyNodeJoin(nc2, null);
+
+        JobSpecification jobSpec = new JobSpecification();
+        // add nc1 and some other unknown location
+        final ConstantExpression nc1Location = new ConstantExpression(nc1);
+        final ConstantExpression unknownLocation = new ConstantExpression(unknown);
+        final LValueConstraintExpression lValueMock = Mockito.mock(LValueConstraintExpression.class);
+        jobSpec.getUserConstraints().add(new Constraint(lValueMock, nc1Location));
+        jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
+
+        JobId jobId = new JobId(1);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        // make sure nc1 has a pending job
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
+        nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+        // make sure nc1 doesn't have pending jobs anymore
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
+
+        // make sure node doesn't have pending jobs after failure
+        jobId = new JobId(2);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
+        nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
+        Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 26f80228..7f1100b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -106,6 +106,8 @@
         checkJob(jobRun);
         JobSpecification job = jobRun.getJobSpecification();
         IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+        CCServiceContext serviceCtx = ccs.getContext();
+        serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
         switch (status) {
             case QUEUE:
                 queueJob(jobRun);
@@ -304,10 +306,6 @@
         run.setStartTime(System.currentTimeMillis());
         JobId jobId = run.getJobId();
         activeRunMap.put(jobId, run);
-
-        CCServiceContext serviceCtx = ccs.getContext();
-        JobSpecification spec = run.getJobSpecification();
-        serviceCtx.notifyJobCreation(jobId, spec);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
     }