Fix potential hanging in op.initialize().
- Let initialize()/deinitialize() always pair up.
Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1243
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4ee8303..3e557a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,12 +20,13 @@
package org.apache.hyracks.api.rewriter.runtime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@@ -56,6 +57,7 @@
private final int partition;
private final int nPartitions;
private int inputArity = 0;
+ private boolean[] startedInitialization;
public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -79,8 +81,11 @@
@Override
public void initialize() throws HyracksDataException {
- // Initializes all OperatorNodePushables in parallel.
- runInParallel(op -> op.initialize());
+ // Initializes all OperatorNodePushables in parallel and then finally deinitializes them.
+ runInParallel((op, index) -> {
+ startedInitialization[index] = true;
+ op.initialize();
+ });
}
public void init() throws HyracksDataException {
@@ -150,12 +155,19 @@
}
}
}
+
+ // Sets the startedInitialization flags to be false.
+ startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
+ Arrays.fill(startedInitialization, false);
}
@Override
public void deinitialize() throws HyracksDataException {
- // De-initialize all OperatorNodePushables in parallel.
- runInParallel(op -> op.deinitialize());
+ runInParallel((op, index) -> {
+ if (startedInitialization[index]) {
+ op.deinitialize();
+ }
+ });
}
@Override
@@ -191,18 +203,21 @@
}
interface OperatorNodePushableAction {
- public void runAction(IOperatorNodePushable op) throws HyracksDataException;
+ void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException;
}
- private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
- List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
+ private void runInParallel(OperatorNodePushableAction opAction)
+ throws HyracksDataException {
+ List<Future<Void>> initializationTasks = new ArrayList<>();
try {
+ int index = 0;
// Run one action for all OperatorNodePushables in parallel through a thread pool.
for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+ final int opIndex = index++;
initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
- opAction.runAction(op);
+ opAction.runAction(op, opIndex);
return null;
}
}));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index f463bfa..7c626c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -235,8 +235,12 @@
}
}
- private synchronized void addPendingThread(Thread t) {
+ private synchronized boolean addPendingThread(Thread t) {
+ if (aborted) {
+ return false;
+ }
pendingThreads.add(t);
+ return true;
}
private synchronized void removePendingThread(Thread t) {
@@ -256,9 +260,16 @@
public void run() {
Thread ct = Thread.currentThread();
String threadName = ct.getName();
- addPendingThread(ct);
+ ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+ // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(ct)) {
+ exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!"));
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
+ return;
+ }
try {
- ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
try {
operator.initialize();
if (collectors.length > 0) {
@@ -271,11 +282,12 @@
executorService.execute(new Runnable() {
@Override
public void run() {
- if (aborted) {
+ Thread thread = Thread.currentThread();
+ // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(thread)) {
return;
}
- Thread thread = Thread.currentThread();
- addPendingThread(thread);
String oldName = thread.getName();
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
thread.setPriority(Thread.MIN_PRIORITY);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 452da88..b76e458 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -26,13 +26,18 @@
import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
import org.junit.Assert;
+import org.junit.Test;
public class JobFailureTest extends AbstractMultiNCIntegrationTest {
- // commenting out due to intermittent hangs:
- // https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/2877/artifact/target/threaddumps/jstack_28541.html
- // @Test
+ @Test
public void failureOnCreatePushRuntime() throws Exception {
+ for (int round = 0; round < 1000; ++round) {
+ execTest();
+ }
+ }
+
+ private void execTest() throws Exception {
JobSpecification spec = new JobSpecification();
AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
0, 1, new int[] { 4 }, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
index 8c5bf48..14c644a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +34,7 @@
public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
+ private static Logger LOGGER = Logger.getLogger(ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getName());
private static AtomicInteger createPushRuntime = new AtomicInteger();
private static AtomicInteger initializeCounter = new AtomicInteger();
private static AtomicInteger openCloseCounter = new AtomicInteger();
@@ -126,10 +129,10 @@
public static boolean succeed() {
boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0;
if (!success) {
- System.err.println("Failure:");
- System.err.println("CreatePushRuntime:" + createPushRuntime.get());
- System.err.println("InitializeCounter:" + initializeCounter.get());
- System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+ LOGGER.log(Level.SEVERE, "Failure:");
+ LOGGER.log(Level.SEVERE, "CreatePushRuntime:" + createPushRuntime.get());
+ LOGGER.log(Level.SEVERE, "InitializeCounter:" + initializeCounter.get());
+ LOGGER.log(Level.SEVERE, "OpenCloseCounter:" + openCloseCounter.get());
}
return success;
}