ASTERIXDB-1642,ASTERIXDB-1657,ASTERIXDB-1658 Fix Task Failure Handling
Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1197
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index b377b1a..ab026eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -626,15 +626,10 @@
/**
* Indicates that a single task attempt has encountered a failure.
- *
- * @param ta
- * - Failed Task Attempt
- * @param ac
- * - Activity Cluster that owns this Task
- * @param details
- * - Cause of the failure
+ * @param ta Failed Task Attempt
+ * @param exceptions exeptions thrown during the failure
*/
- public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, List<Exception> exceptions) {
+ public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 6db3700..8bca4e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.JobRun;
@@ -40,8 +39,7 @@
protected void performEvent(TaskAttempt ta) {
JobRun run = ccs.getActiveRunMap().get(jobId);
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
- ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
- run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+ run.getScheduler().notifyTaskFailure(ta, exceptions);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0d9ff5d..c91ebd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -165,6 +165,7 @@
this.nodeId = nodeId;
}
+ @Override
public String toString() {
return super.toString() + "@" + nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 6566655..07e1ad2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -55,9 +55,7 @@
if (dpm != null) {
ncs.getDatasetPartitionManager().abortReader(jobId);
}
-
- Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- Joblet ji = jobletMap.get(jobId);
+ Joblet ji = ncs.getJobletMap().get(jobId);
if (ji != null) {
Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
for (TaskAttemptId taId : tasks) {
@@ -66,6 +64,9 @@
task.abort();
}
}
+ } else {
+ LOGGER.log(Level.WARNING, "Joblet couldn't be found. Tasks of job " + jobId
+ + " have all either completed or failed");
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 98f2097..ad9481d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobFlag;
@@ -55,6 +56,7 @@
import org.apache.hyracks.comm.channels.NetworkInputChannel;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.utils.ExceptionUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -97,11 +99,11 @@
@Override
public void run() {
+ Task task = null;
try {
NCApplicationContext appCtx = ncs.getApplicationContext();
- final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
+ Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
-
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
@@ -117,7 +119,6 @@
return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
};
-
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
@@ -129,7 +130,7 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
- Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
+ task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
@@ -161,30 +162,32 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
}
- IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
- td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
+ .getPartitionCount(), td.getOutputPartitionCounts()[i]);
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
joblet.addTask(task);
-
task.start();
}
} catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ LOGGER.log(Level.WARNING, "Failure starting a task", e);
+ // notify cc of start task failure
+ List<Exception> exceptions = new ArrayList<>();
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions));
}
}
private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
- byte[] acgBytes) throws Exception {
+ byte[] acgBytes) throws HyracksException {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
if (acgBytes == null) {
- throw new NullPointerException("JobActivityGraph was null");
+ throw new HyracksException("Joblet was not found. This job was most likely aborted.");
}
ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId,
appCtx);
@@ -197,11 +200,11 @@
private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
throws HyracksDataException {
- IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
- td.getInputPartitionCounts()[i], td.getPartitionCount());
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
+ .getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
- task.getTaskAttemptId(), ncs.getExecutorService());
+ return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
+ .getTaskAttemptId(), ncs.getExecutorService());
} else {
return collector;
}
@@ -222,10 +225,12 @@
};
} else {
factory = new IPartitionWriterFactory() {
+
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
- jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
+ jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
+ .getExecutorService());
}
};
}
@@ -233,8 +238,8 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId);
+ return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
+ .getConnectorId(), senderIndex, receiverIndex), taId);
}
};
}
@@ -254,8 +259,8 @@
* @return a list of known channels, one for each connector
* @throws UnknownHostException
*/
- private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
- throws UnknownHostException {
+ private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td,
+ List<IConnectorDescriptor> inputs) throws UnknownHostException {
NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>();
if (inputAddresses != null) {
@@ -266,8 +271,8 @@
NetworkAddress networkAddress = inputAddresses[i][j];
PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
.getTaskAttemptId().getTaskId().getPartition());
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
- ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
+ .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
.lookupIpAddress()), networkAddress.getPort()), pid, 5));
channels.add(channel);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 9685837..dc0b6f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -27,12 +27,6 @@
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
-import org.json.JSONArray;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -51,6 +45,11 @@
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.json.JSONArray;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
public abstract class AbstractMultiNCIntegrationTest {
@@ -72,7 +71,6 @@
public AbstractMultiNCIntegrationTest() {
outputFiles = new ArrayList<File>();
- ;
}
@BeforeClass
@@ -135,37 +133,38 @@
IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
- IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+ if (!spec.getResultSetIds().isEmpty()) {
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
- JSONArray resultRecords = new JSONArray();
- ByteBufferInputStream bbis = new ByteBufferInputStream();
+ JSONArray resultRecords = new JSONArray();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
- int readSize = reader.read(resultFrame);
+ int readSize = reader.read(resultFrame);
- while (readSize > 0) {
+ while (readSize > 0) {
- try {
- frameTupleAccessor.reset(resultFrame.getBuffer());
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex);
- int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
- bbis.setByteBuffer(resultFrame.getBuffer(), start);
- byte[] recordBytes = new byte[length];
- bbis.read(recordBytes, 0, length);
- resultRecords.put(new String(recordBytes, 0, length));
- }
- } finally {
try {
- bbis.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ frameTupleAccessor.reset(resultFrame.getBuffer());
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultFrame.getBuffer(), start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.put(new String(recordBytes, 0, length));
+ }
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
+
+ readSize = reader.read(resultFrame);
}
-
- readSize = reader.read(resultFrame);
}
-
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
new file mode 100644
index 0000000..6a7a6a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobFailureTest extends AbstractMultiNCIntegrationTest {
+
+ @Test
+ public void failureOnCreatePushRuntime() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
+ 0, 1, new int[] { 4 }, true);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
+ SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
+ IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+ spec.addRoot(sinkOpDesc);
+ try {
+ runTest(spec);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ Assert.assertTrue(ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+ // should also check the content of the different ncs
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
new file mode 100644
index 0000000..8c5bf48
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private static AtomicInteger createPushRuntime = new AtomicInteger();
+ private static AtomicInteger initializeCounter = new AtomicInteger();
+ private static AtomicInteger openCloseCounter = new AtomicInteger();
+ private final int[] exceptionPartitions;
+ private final boolean sleepOnInitialize;
+
+ public ExceptionOnCreatePushRuntimeOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity,
+ int outputArity, int[] exceptionPartitions, boolean sleepOnInitialize) {
+ super(spec, inputArity, outputArity);
+ this.exceptionPartitions = exceptionPartitions;
+ this.sleepOnInitialize = sleepOnInitialize;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ createPushRuntime.incrementAndGet();
+ try {
+ if (exceptionPartitions != null) {
+ for (int p : exceptionPartitions) {
+ if (p == partition) {
+ throw new HyracksDataException("I throw exceptions");
+ }
+ }
+ }
+ return new IOperatorNodePushable() {
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ initializeCounter.incrementAndGet();
+ if (sleepOnInitialize) {
+ try {
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ // can safely interrupt thread since this is a task thread
+ Thread.currentThread().interrupt();
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ openCloseCounter.incrementAndGet();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ openCloseCounter.decrementAndGet();
+ }
+ };
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getSimpleName()
+ + ".OperatorNodePushable:" + partition;
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ initializeCounter.decrementAndGet();
+ }
+ };
+ } finally {
+ createPushRuntime.decrementAndGet();
+ }
+ }
+
+ public static boolean succeed() {
+ boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0;
+ if (!success) {
+ System.err.println("Failure:");
+ System.err.println("CreatePushRuntime:" + createPushRuntime.get());
+ System.err.println("InitializeCounter:" + initializeCounter.get());
+ System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+ }
+ return success;
+ }
+}