ASTERIXDB-1642,ASTERIXDB-1657,ASTERIXDB-1658 Fix Task Failure Handling

Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1197
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index b377b1a..ab026eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -626,15 +626,10 @@
 
     /**
      * Indicates that a single task attempt has encountered a failure.
-     *
-     * @param ta
-     *            - Failed Task Attempt
-     * @param ac
-     *            - Activity Cluster that owns this Task
-     * @param details
-     *            - Cause of the failure
+     * @param ta Failed Task Attempt
+     * @param exceptions exeptions thrown during the failure
      */
-    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, List<Exception> exceptions) {
+    public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
             LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 6db3700..8bca4e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.JobRun;
@@ -40,8 +39,7 @@
     protected void performEvent(TaskAttempt ta) {
         JobRun run = ccs.getActiveRunMap().get(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
-        ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
-        run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+        run.getScheduler().notifyTaskFailure(ta, exceptions);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0d9ff5d..c91ebd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -165,6 +165,7 @@
             this.nodeId = nodeId;
         }
 
+        @Override
         public String toString() {
             return super.toString() + "@" + nodeId;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 6566655..07e1ad2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -55,9 +55,7 @@
         if (dpm != null) {
             ncs.getDatasetPartitionManager().abortReader(jobId);
         }
-
-        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        Joblet ji = jobletMap.get(jobId);
+        Joblet ji = ncs.getJobletMap().get(jobId);
         if (ji != null) {
             Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
             for (TaskAttemptId taId : tasks) {
@@ -66,6 +64,9 @@
                     task.abort();
                 }
             }
+        } else {
+            LOGGER.log(Level.WARNING, "Joblet couldn't be found. Tasks of job " + jobId
+                    + " have all either completed or failed");
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 98f2097..ad9481d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobFlag;
@@ -55,6 +56,7 @@
 import org.apache.hyracks.comm.channels.NetworkInputChannel;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.utils.ExceptionUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -97,11 +99,11 @@
 
     @Override
     public void run() {
+        Task task = null;
         try {
             NCApplicationContext appCtx = ncs.getApplicationContext();
-            final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
+            Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
-
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
                 public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
@@ -117,7 +119,6 @@
                     return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
             };
-
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
@@ -129,7 +130,7 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
+                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
@@ -161,30 +162,32 @@
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
                         }
-                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
-                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
+                                .getPartitionCount(), td.getOutputPartitionCounts()[i]);
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
 
                 task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
                 joblet.addTask(task);
-
                 task.start();
             }
         } catch (Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
+            LOGGER.log(Level.WARNING, "Failure starting a task", e);
+            // notify cc of start task failure
+            List<Exception> exceptions = new ArrayList<>();
+            ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions));
         }
     }
 
     private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
-            byte[] acgBytes) throws Exception {
+            byte[] acgBytes) throws HyracksException {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
             if (acgBytes == null) {
-                throw new NullPointerException("JobActivityGraph was null");
+                throw new HyracksException("Joblet was not found. This job was most likely aborted.");
             }
             ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId,
                     appCtx);
@@ -197,11 +200,11 @@
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
             throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
-                td.getInputPartitionCounts()[i], td.getPartitionCount());
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
+                .getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
-                    task.getTaskAttemptId(), ncs.getExecutorService());
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
+                    .getTaskAttemptId(), ncs.getExecutorService());
         } else {
             return collector;
         }
@@ -222,10 +225,12 @@
                 };
             } else {
                 factory = new IPartitionWriterFactory() {
+
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                         return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
+                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
+                                        .getExecutorService());
                     }
                 };
             }
@@ -233,8 +238,8 @@
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
-                            conn.getConnectorId(), senderIndex, receiverIndex), taId);
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
+                            .getConnectorId(), senderIndex, receiverIndex), taId);
                 }
             };
         }
@@ -254,8 +259,8 @@
      * @return a list of known channels, one for each connector
      * @throws UnknownHostException
      */
-    private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
-            throws UnknownHostException {
+    private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td,
+            List<IConnectorDescriptor> inputs) throws UnknownHostException {
         NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
         List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>();
         if (inputAddresses != null) {
@@ -266,8 +271,8 @@
                         NetworkAddress networkAddress = inputAddresses[i][j];
                         PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
                                 .getTaskAttemptId().getTaskId().getPartition());
-                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
-                                ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
+                                .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
                                         .lookupIpAddress()), networkAddress.getPort()), pid, 5));
                         channels.add(channel);
                     }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 9685837..dc0b6f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -27,12 +27,6 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
-import org.json.JSONArray;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -51,6 +45,11 @@
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.json.JSONArray;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -72,7 +71,6 @@
 
     public AbstractMultiNCIntegrationTest() {
         outputFiles = new ArrayList<File>();
-        ;
     }
 
     @BeforeClass
@@ -135,37 +133,38 @@
 
         IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
 
-        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
-        IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+        if (!spec.getResultSetIds().isEmpty()) {
+            IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+            IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
 
-        JSONArray resultRecords = new JSONArray();
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
+            JSONArray resultRecords = new JSONArray();
+            ByteBufferInputStream bbis = new ByteBufferInputStream();
 
-        int readSize = reader.read(resultFrame);
+            int readSize = reader.read(resultFrame);
 
-        while (readSize > 0) {
+            while (readSize > 0) {
 
-            try {
-                frameTupleAccessor.reset(resultFrame.getBuffer());
-                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                    int start = frameTupleAccessor.getTupleStartOffset(tIndex);
-                    int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
-                    bbis.setByteBuffer(resultFrame.getBuffer(), start);
-                    byte[] recordBytes = new byte[length];
-                    bbis.read(recordBytes, 0, length);
-                    resultRecords.put(new String(recordBytes, 0, length));
-                }
-            } finally {
                 try {
-                    bbis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    frameTupleAccessor.reset(resultFrame.getBuffer());
+                    for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                        int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+                        int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+                        bbis.setByteBuffer(resultFrame.getBuffer(), start);
+                        byte[] recordBytes = new byte[length];
+                        bbis.read(recordBytes, 0, length);
+                        resultRecords.put(new String(recordBytes, 0, length));
+                    }
+                } finally {
+                    try {
+                        bbis.close();
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
+
+                readSize = reader.read(resultFrame);
             }
-
-            readSize = reader.read(resultFrame);
         }
-
         hcc.waitForCompletion(jobId);
         dumpOutputFiles();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
new file mode 100644
index 0000000..6a7a6a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobFailureTest extends AbstractMultiNCIntegrationTest {
+
+    @Test
+    public void failureOnCreatePushRuntime() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
+                0, 1, new int[] { 4 }, true);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
+        SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
+        IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+        spec.addRoot(sinkOpDesc);
+        try {
+            runTest(spec);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+        Assert.assertTrue(ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+        // should also check the content of the different ncs
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
new file mode 100644
index 0000000..8c5bf48
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private static AtomicInteger createPushRuntime = new AtomicInteger();
+    private static AtomicInteger initializeCounter = new AtomicInteger();
+    private static AtomicInteger openCloseCounter = new AtomicInteger();
+    private final int[] exceptionPartitions;
+    private final boolean sleepOnInitialize;
+
+    public ExceptionOnCreatePushRuntimeOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity,
+            int outputArity, int[] exceptionPartitions, boolean sleepOnInitialize) {
+        super(spec, inputArity, outputArity);
+        this.exceptionPartitions = exceptionPartitions;
+        this.sleepOnInitialize = sleepOnInitialize;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        createPushRuntime.incrementAndGet();
+        try {
+            if (exceptionPartitions != null) {
+                for (int p : exceptionPartitions) {
+                    if (p == partition) {
+                        throw new HyracksDataException("I throw exceptions");
+                    }
+                }
+            }
+            return new IOperatorNodePushable() {
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+                        throws HyracksDataException {
+                }
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    initializeCounter.incrementAndGet();
+                    if (sleepOnInitialize) {
+                        try {
+                            synchronized (this) {
+                                wait();
+                            }
+                        } catch (InterruptedException e) {
+                            // can safely interrupt thread since this is a task thread
+                            Thread.currentThread().interrupt();
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                }
+
+                @Override
+                public IFrameWriter getInputFrameWriter(int index) {
+                    return new IFrameWriter() {
+                        @Override
+                        public void open() throws HyracksDataException {
+                            openCloseCounter.incrementAndGet();
+                        }
+
+                        @Override
+                        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                        }
+
+                        @Override
+                        public void fail() throws HyracksDataException {
+                        }
+
+                        @Override
+                        public void close() throws HyracksDataException {
+                            openCloseCounter.decrementAndGet();
+                        }
+                    };
+                }
+
+                @Override
+                public int getInputArity() {
+                    return inputArity;
+                }
+
+                @Override
+                public String getDisplayName() {
+                    return ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getSimpleName()
+                            + ".OperatorNodePushable:" + partition;
+                }
+
+                @Override
+                public void deinitialize() throws HyracksDataException {
+                    initializeCounter.decrementAndGet();
+                }
+            };
+        } finally {
+            createPushRuntime.decrementAndGet();
+        }
+    }
+
+    public static boolean succeed() {
+        boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0;
+        if (!success) {
+            System.err.println("Failure:");
+            System.err.println("CreatePushRuntime:" + createPushRuntime.get());
+            System.err.println("InitializeCounter:" + initializeCounter.get());
+            System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+        }
+        return success;
+    }
+}