[NO ISSUE][RT] Add test cases for runtime failures
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Add three test cases:
- RuntimeException during initialize()
- RuntimeException during deinitialize()
- RuntimeException during both
Change-Id: If26ade138b003349cfd9619188bd9129ecd1034a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2290
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index a455cc9..7100895 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -143,7 +143,7 @@
hcc.cancelJob(jobId);
}
- protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception {
+ protected JobId runTest(JobSpecification spec, String expectedErrorMessage) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(spec.toJSON().asText());
}
@@ -195,6 +195,7 @@
// Waiting a second time should lead to the same behavior
waitForCompletion(jobId, expectedErrorMessage);
dumpOutputFiles();
+ return jobId;
}
protected void waitForCompletion(JobId jobId, String expectedErrorMessage) throws Exception {
@@ -244,7 +245,8 @@
@Override
public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
- ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
+ ? JobSubmissionStatus.EXECUTE
+ : JobSubmissionStatus.QUEUE;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 34b1480..478138e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.apache.hyracks.tests.util.FailOnInitializeDeInitializeOperatorDescriptor;
import org.junit.Assert;
import org.junit.Test;
@@ -33,10 +34,13 @@
@Test
public void failureOnCreatePushRuntime() throws Exception {
- JobId jobId = new JobId(0); // First job
+ JobId jobId = null;
for (int i = 0; i < 20; i++) {
- execTest();
+ JobSpecification spec = new JobSpecification();
+ JobId runJobId = runTest(spec,
+ new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true));
if (i == 0) {
+ jobId = runJobId;
// passes. read from job archive
waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
}
@@ -44,7 +48,8 @@
// passes. read from job history
waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
for (int i = 0; i < 300; i++) {
- execTest();
+ JobSpecification spec = new JobSpecification();
+ runTest(spec, new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true));
}
// passes. history has been cleared
waitForCompletion(jobId, "has been cleared from job history");
@@ -56,10 +61,52 @@
waitForCompletion(jobId, "has not been created yet");
}
- private void execTest() throws Exception {
+ @Test
+ public void failureOnInit() throws Exception {
JobSpecification spec = new JobSpecification();
- AbstractSingleActivityOperatorDescriptor sourceOpDesc =
- new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true);
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, true, false),
+ FailOnInitializeDeInitializeOperatorDescriptor.INIT_ERROR_MESSAGE);
+ // Ensure you can run the next job
+ spec = new JobSpecification();
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+ }
+
+ @Test
+ public void failureOnDeinit() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, true),
+ FailOnInitializeDeInitializeOperatorDescriptor.DEINIT_ERROR_MESSAGE);
+ // Ensure you can run the next job
+ spec = new JobSpecification();
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+ }
+
+ @Test
+ public void failureOnInitDeinit() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, true, true),
+ FailOnInitializeDeInitializeOperatorDescriptor.INIT_ERROR_MESSAGE);
+ // Ensure you can run the next job
+ spec = new JobSpecification();
+ connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+ }
+
+ private JobId runTest(JobSpecification spec, AbstractSingleActivityOperatorDescriptor sourceOpDesc)
+ throws Exception {
+ try {
+ return connectToSinkAndRun(spec, sourceOpDesc,
+ ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+ } finally {
+ Assert.assertTrue(
+ ExceptionOnCreatePushRuntimeOperatorDescriptor.stats()
+ + ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed(),
+ ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+ // should also check the content of the different ncs
+ }
+ }
+
+ private JobId connectToSinkAndRun(JobSpecification spec, AbstractSingleActivityOperatorDescriptor sourceOpDesc,
+ String expectedError) throws Exception {
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
@@ -67,15 +114,10 @@
spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
spec.addRoot(sinkOpDesc);
try {
- runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+ return runTest(spec, expectedError);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
- Assert.assertTrue(
- ExceptionOnCreatePushRuntimeOperatorDescriptor.stats()
- + ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed(),
- ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
- // should also check the content of the different ncs
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java
new file mode 100644
index 0000000..56d59ab
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.util;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FailOnInitializeDeInitializeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ public static final String INIT_ERROR_MESSAGE = "Failure on initialize()";
+ public static final String DEINIT_ERROR_MESSAGE = "Failure on deinitialize()";
+ private final boolean failOnInit;
+ private final boolean failOnDeinit;
+
+ public FailOnInitializeDeInitializeOperatorDescriptor(IOperatorDescriptorRegistry spec, boolean failOnInit,
+ boolean failOnDeInit) {
+ super(spec, 0, 1);
+ this.failOnInit = failOnInit;
+ this.failOnDeinit = failOnDeInit;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new IOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ if (failOnInit) {
+ throw new RuntimeException(INIT_ERROR_MESSAGE);
+ }
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ if (failOnDeinit) {
+ throw new RuntimeException(DEINIT_ERROR_MESSAGE);
+ }
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ // ignore
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return FailOnInitializeDeInitializeOperatorDescriptor.class.getSimpleName();
+ }
+ };
+ }
+}