[NO ISSUE][TEST] Improve Job Failure Tests
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Add a check that all started jobs finished in multi nc tests.
- Job Cancellation is only completed when the job is final
completed.
Change-Id: I9cdf53a88e07aaa3dc7cd11c5bb7ef9369835da6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1983
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index d36d9b7..cbcc44f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -132,6 +132,7 @@
@Override
public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
throws HyracksException {
+ LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId);
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index b710f2b..c01bcbc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.active;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -90,8 +89,8 @@
((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections,
compilationProvider, storageComponentProvider, statementExecutorFactory, hcc);
JobSpecification feedJob = jobInfo.getLeft();
- WaitForStateSubscriber eventSubscriber =
- new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING));
+ WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
+ ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
// TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
// We will need to design general exception handling mechanism for feeds.
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index e03ee6e..b1191ec 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -165,6 +165,26 @@
}
@Test
+ public void testStartWhenStartFailsCompile() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_COMPILE);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertFailure(action, 0);
+ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ }
+
+ @Test
+ public void testStartWhenStartFailsRuntime() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_RUNTIME);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertFailure(action, 0);
+ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ }
+
+ @Test
public void testStartWhenOneNodeFinishesBeforeOtherNodeStarts() throws Exception {
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
listener.onStart(Behavior.SUCCEED);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 21c1e77..cc46a7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -58,6 +59,7 @@
public abstract class AbstractMultiNCIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
+ private static final TestJobLifecycleListener jobLifecycleListener = new TestJobLifecycleListener();
public static final String[] ASTERIX_IDS =
{ "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" };
@@ -92,7 +94,8 @@
ccConfig.setAppClass(DummyApplication.class.getName());
cc = new ClusterControllerService(ccConfig);
cc.start();
-
+ CCServiceContext serviceCtx = cc.getContext();
+ serviceCtx.addJobLifecycleListener(jobLifecycleListener);
asterixNCs = new NodeControllerService[ASTERIX_IDS.length];
for (int i = 0; i < ASTERIX_IDS.length; i++) {
File ioDev = new File("target" + File.separator + ASTERIX_IDS[i] + File.separator + "ioDevice");
@@ -121,6 +124,7 @@
nc.stop();
}
cc.stop();
+ jobLifecycleListener.check();
}
protected JobId startJob(JobSpecification spec) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
new file mode 100644
index 0000000..c8d0b9c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+
+public class TestJobLifecycleListener implements IJobLifecycleListener {
+
+ private static final Logger LOGGER = Logger.getLogger(TestJobLifecycleListener.class.getName());
+ private final Map<JobId, JobSpecification> created = new HashMap<>();
+ private final Set<JobId> started = new HashSet<>();
+ private final Set<JobId> finished = new HashSet<>();
+ private final Map<JobId, Integer> doubleCreated = new HashMap<>();
+ private final Map<JobId, Integer> doubleStarted = new HashMap<>();
+ private final Map<JobId, Integer> doubleFinished = new HashMap<>();
+ private final Set<JobId> startWithoutCreate = new HashSet<>();
+ private final Set<JobId> finishWithoutStart = new HashSet<>();
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ if (created.containsKey(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has been created before");
+ increment(doubleCreated, jobId);
+ }
+ created.put(jobId, spec);
+ }
+
+ private void increment(Map<JobId, Integer> map, JobId jobId) {
+ Integer count = map.get(jobId);
+ count = count == null ? 2 : count + 1;
+ map.put(jobId, count);
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId) throws HyracksException {
+ if (!created.containsKey(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has not been created");
+ startWithoutCreate.add(jobId);
+ }
+ if (started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has been started before");
+ increment(doubleStarted, jobId);
+ }
+ started.add(jobId);
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ if (!started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has not been started");
+ finishWithoutStart.add(jobId);
+ }
+ if (finished.contains(jobId)) {
+ // TODO: job finish should be called once only when it has really completed
+ // throw new HyracksDataException("Job " + jobId + "has been finished before");
+ LOGGER.log(Level.WARNING, "Dangerous: Duplicate Job: " + jobId + " has finished with status: " + jobStatus);
+ increment(doubleFinished, jobId);
+ }
+ finished.add(jobId);
+ }
+
+ public void check() throws Exception {
+ LOGGER.log(Level.WARNING, "Checking all created jobs have started");
+ for (JobId jobId : created.keySet()) {
+ if (!started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "JobId " + jobId + " has been created but never started");
+ }
+ }
+ LOGGER.log(Level.WARNING, "Checking all started jobs have terminated");
+ for (JobId jobId : started) {
+ if (!finished.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "JobId " + jobId + " has started but not finished");
+ }
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple creates");
+ for (Entry<JobId, Integer> entry : doubleCreated.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been created " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple starts");
+ for (Entry<JobId, Integer> entry : doubleStarted.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been started " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple finishes");
+ for (Entry<JobId, Integer> entry : doubleFinished.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been finished " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Done checking!");
+ }
+}