[NO ISSUE][RT] Add test cases for runtime failures

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Add three test cases:
  - RuntimeException during initialize()
  - RuntimeException during deinitialize()
  - RuntimeException during both

Change-Id: If26ade138b003349cfd9619188bd9129ecd1034a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2290
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index a455cc9..7100895 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -143,7 +143,7 @@
         hcc.cancelJob(jobId);
     }
 
-    protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception {
+    protected JobId runTest(JobSpecification spec, String expectedErrorMessage) throws Exception {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info(spec.toJSON().asText());
         }
@@ -195,6 +195,7 @@
         // Waiting a second time should lead to the same behavior
         waitForCompletion(jobId, expectedErrorMessage);
         dumpOutputFiles();
+        return jobId;
     }
 
     protected void waitForCompletion(JobId jobId, String expectedErrorMessage) throws Exception {
@@ -244,7 +245,8 @@
                 @Override
                 public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
                     return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
-                            ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
+                            ? JobSubmissionStatus.EXECUTE
+                            : JobSubmissionStatus.QUEUE;
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 34b1480..478138e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
 import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.apache.hyracks.tests.util.FailOnInitializeDeInitializeOperatorDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,10 +34,13 @@
 
     @Test
     public void failureOnCreatePushRuntime() throws Exception {
-        JobId jobId = new JobId(0); // First job
+        JobId jobId = null;
         for (int i = 0; i < 20; i++) {
-            execTest();
+            JobSpecification spec = new JobSpecification();
+            JobId runJobId = runTest(spec,
+                    new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true));
             if (i == 0) {
+                jobId = runJobId;
                 // passes. read from job archive
                 waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
             }
@@ -44,7 +48,8 @@
         // passes. read from job history
         waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
         for (int i = 0; i < 300; i++) {
-            execTest();
+            JobSpecification spec = new JobSpecification();
+            runTest(spec, new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true));
         }
         // passes. history has been cleared
         waitForCompletion(jobId, "has been cleared from job history");
@@ -56,10 +61,52 @@
         waitForCompletion(jobId, "has not been created yet");
     }
 
-    private void execTest() throws Exception {
+    @Test
+    public void failureOnInit() throws Exception {
         JobSpecification spec = new JobSpecification();
-        AbstractSingleActivityOperatorDescriptor sourceOpDesc =
-                new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true);
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, true, false),
+                FailOnInitializeDeInitializeOperatorDescriptor.INIT_ERROR_MESSAGE);
+        // Ensure you can run the next job
+        spec = new JobSpecification();
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+    }
+
+    @Test
+    public void failureOnDeinit() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, true),
+                FailOnInitializeDeInitializeOperatorDescriptor.DEINIT_ERROR_MESSAGE);
+        // Ensure you can run the next job
+        spec = new JobSpecification();
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+    }
+
+    @Test
+    public void failureOnInitDeinit() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, true, true),
+                FailOnInitializeDeInitializeOperatorDescriptor.INIT_ERROR_MESSAGE);
+        // Ensure you can run the next job
+        spec = new JobSpecification();
+        connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, false, false), null);
+    }
+
+    private JobId runTest(JobSpecification spec, AbstractSingleActivityOperatorDescriptor sourceOpDesc)
+            throws Exception {
+        try {
+            return connectToSinkAndRun(spec, sourceOpDesc,
+                    ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+        } finally {
+            Assert.assertTrue(
+                    ExceptionOnCreatePushRuntimeOperatorDescriptor.stats()
+                            + ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed(),
+                    ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+            // should also check the content of the different ncs
+        }
+    }
+
+    private JobId connectToSinkAndRun(JobSpecification spec, AbstractSingleActivityOperatorDescriptor sourceOpDesc,
+            String expectedError) throws Exception {
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
         SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
@@ -67,15 +114,10 @@
         spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
         spec.addRoot(sinkOpDesc);
         try {
-            runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+            return runTest(spec, expectedError);
         } catch (Exception e) {
             e.printStackTrace();
             throw e;
         }
-        Assert.assertTrue(
-                ExceptionOnCreatePushRuntimeOperatorDescriptor.stats()
-                        + ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed(),
-                ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
-        // should also check the content of the different ncs
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java
new file mode 100644
index 0000000..56d59ab
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/FailOnInitializeDeInitializeOperatorDescriptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.util;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FailOnInitializeDeInitializeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    public static final String INIT_ERROR_MESSAGE = "Failure on initialize()";
+    public static final String DEINIT_ERROR_MESSAGE = "Failure on deinitialize()";
+    private final boolean failOnInit;
+    private final boolean failOnDeinit;
+
+    public FailOnInitializeDeInitializeOperatorDescriptor(IOperatorDescriptorRegistry spec, boolean failOnInit,
+            boolean failOnDeInit) {
+        super(spec, 0, 1);
+        this.failOnInit = failOnInit;
+        this.failOnDeinit = failOnDeInit;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new IOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                if (failOnInit) {
+                    throw new RuntimeException(INIT_ERROR_MESSAGE);
+                }
+            }
+
+            @Override
+            public void deinitialize() throws HyracksDataException {
+                if (failOnDeinit) {
+                    throw new RuntimeException(DEINIT_ERROR_MESSAGE);
+                }
+            }
+
+            @Override
+            public int getInputArity() {
+                return 0;
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+                    throws HyracksDataException {
+                // ignore
+            }
+
+            @Override
+            public IFrameWriter getInputFrameWriter(int index) {
+                return null;
+            }
+
+            @Override
+            public String getDisplayName() {
+                return FailOnInitializeDeInitializeOperatorDescriptor.class.getSimpleName();
+            }
+        };
+    }
+}