ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1681
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 4c0eb1b..c9cdb2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -20,6 +20,8 @@
package org.apache.hyracks.api.exceptions;
import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.util.ErrorMessageUtil;
@@ -29,11 +31,17 @@
public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(HyracksDataException.class.getName());
public static HyracksDataException create(Throwable cause) {
if (cause instanceof HyracksDataException || cause == null) {
return (HyracksDataException) cause;
}
+ if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
+ LOGGER.log(Level.WARNING,
+ "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted",
+ cause);
+ }
return new HyracksDataException(cause);
}
@@ -46,8 +54,8 @@
}
public static HyracksDataException create(HyracksDataException e, String nodeId) {
- return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e
- .getParams());
+ return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId,
+ e.getParams());
}
public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 1c4f916..eeaee04 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,14 +20,14 @@
package org.apache.hyracks.api.rewriter.runtime;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,12 +43,10 @@
/**
* The runtime of a SuperActivity, which internally executes a DAG of one-to-one
* connected activities in a single thread.
- *
- * @author yingyib
*/
public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
- private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
- private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+ private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>();
+ private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
private final Map<ActivityId, IActivity> startActivities;
private final SuperActivity parent;
private final IHyracksTaskContext ctx;
@@ -56,7 +54,6 @@
private final int partition;
private final int nPartitions;
private int inputArity = 0;
- private boolean[] startedInitialization;
public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -80,24 +77,25 @@
@Override
public void initialize() throws HyracksDataException {
- // Initializes all OperatorNodePushables in parallel and then finally deinitializes them.
- runInParallel((op, index) -> {
- startedInitialization[index] = true;
- op.initialize();
- });
+ runInParallel(op -> op.initialize());
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ runInParallel(op -> op.deinitialize());
}
private void init() throws HyracksDataException {
- Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
- Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
- List<IConnectorDescriptor> outputConnectors = null;
+ Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>();
+ Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
+ List<IConnectorDescriptor> outputConnectors;
/**
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
- nPartitions);
+ IOperatorNodePushable opPushable =
+ entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
startOperatorNodePushables.put(entry.getKey(), opPushable);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
@@ -154,19 +152,6 @@
}
}
}
-
- // Sets the startedInitialization flags to be false.
- startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()];
- Arrays.fill(startedInitialization, false);
- }
-
- @Override
- public void deinitialize() throws HyracksDataException {
- runInParallel((op, index) -> {
- if (startedInitialization[index]) {
- op.deinitialize();
- }
- });
}
@Override
@@ -192,8 +177,7 @@
*/
Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
- IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
- return writer;
+ return operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
}
@Override
@@ -201,31 +185,40 @@
return "Super Activity " + parent.getActivityMap().values().toString();
}
+ @FunctionalInterface
interface OperatorNodePushableAction {
- void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException;
+ void run(IOperatorNodePushable op) throws HyracksDataException;
}
- private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
- List<Future<Void>> initializationTasks = new ArrayList<>();
+ private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
+ List<Future<Void>> tasks = new ArrayList<>();
+ final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+ final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
try {
- int index = 0;
- // Run one action for all OperatorNodePushables in parallel through a thread pool.
for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
- final int opIndex = index++;
- initializationTasks.add(ctx.getExecutorService().submit(() -> {
- opAction.runAction(op, opIndex);
+ tasks.add(ctx.getExecutorService().submit(() -> {
+ startSemaphore.release();
+ try {
+ action.run(op);
+ } finally {
+ completeSemaphore.release();
+ }
return null;
}));
}
- // Waits until all parallel actions to finish.
- for (Future<Void> initializationTask : initializationTasks) {
- initializationTask.get();
+ for (Future<Void> task : tasks) {
+ task.get();
}
} catch (Exception e) {
- for (Future<Void> initializationTask : initializationTasks) {
- initializationTask.cancel(true);
+ try {
+ startSemaphore.acquireUninterruptibly();
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
+ }
+ } finally {
+ completeSemaphore.acquireUninterruptibly();
}
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}