Misc. Cleanup / InterruptedException Handling
Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1775
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 0beee6f..ddba2d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
index 2f3c72b..28e098f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -65,7 +65,7 @@
* extract start activities
*/
List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
- if (conns == null || conns.size() == 0) {
+ if (conns == null || conns.isEmpty()) {
startActivities.put(entry.getKey(), entry.getValue());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index eeaee04..314bf8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.api.rewriter.runtime;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -29,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -64,7 +66,7 @@
this.partition = partition;
this.nPartitions = nPartitions;
- /**
+ /*
* initialize the writer-relationship for the internal DAG of operator
* node pushables
*/
@@ -77,43 +79,39 @@
@Override
public void initialize() throws HyracksDataException {
- runInParallel(op -> op.initialize());
+ runInParallel(IOperatorNodePushable::initialize);
}
@Override
public void deinitialize() throws HyracksDataException {
- runInParallel(op -> op.deinitialize());
+ runInParallel(IOperatorNodePushable::deinitialize);
}
private void init() throws HyracksDataException {
- Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>();
Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
List<IConnectorDescriptor> outputConnectors;
- /**
+ /*
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable =
- entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- startOperatorNodePushables.put(entry.getKey(), opPushable);
+ IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+ nPartitions);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
- outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
- if (outputConnectors != null) {
- for (IConnectorDescriptor conn : outputConnectors) {
- childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
- }
+ outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
+ Collections.emptyList());
+ for (IConnectorDescriptor conn : outputConnectors) {
+ childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
}
- /**
- * Using BFS (breadth-first search) to construct to runtime execution
- * DAG;
+ /*
+ * Using BFS (breadth-first search) to construct to runtime execution DAG...
*/
- while (childQueue.size() > 0) {
- /**
+ while (!childQueue.isEmpty()) {
+ /*
* construct the source to destination information
*/
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
@@ -130,25 +128,23 @@
operatorNodePushables.put(destId, destOp);
}
- /**
+ /*
* construct the dataflow connection from a producer to a consumer
*/
sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
- /**
+ /*
* traverse to the child of the current activity
*/
- outputConnectors = parent.getActivityOutputMap().get(destId);
+ outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), destId, Collections.emptyList());
- /**
+ /*
* expend the executing activities further to the downstream
*/
- if (outputConnectors != null && outputConnectors.size() > 0) {
- for (IConnectorDescriptor conn : outputConnectors) {
- if (conn != null) {
- childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
- }
+ for (IConnectorDescriptor conn : outputConnectors) {
+ if (conn != null) {
+ childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
}
}
@@ -162,7 +158,7 @@
@Override
public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc)
throws HyracksDataException {
- /**
+ /*
* set the right output frame writer
*/
Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
@@ -172,7 +168,7 @@
@Override
public IFrameWriter getInputFrameWriter(final int index) {
- /**
+ /*
* get the right IFrameWriter from the cluster input index
*/
Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
@@ -209,16 +205,24 @@
for (Future<Void> task : tasks) {
task.get();
}
- } catch (Exception e) {
- try {
- startSemaphore.acquireUninterruptibly();
- for (Future<Void> task : tasks) {
- task.cancel(true);
- }
- } finally {
- completeSemaphore.acquireUninterruptibly();
- }
+ } catch (InterruptedException e) {
+ cancelTasks(tasks, startSemaphore, completeSemaphore);
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
+ } catch (Exception e) {
+ cancelTasks(tasks, startSemaphore, completeSemaphore);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) {
+ try {
+ startSemaphore.acquireUninterruptibly();
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
+ }
+ } finally {
+ completeSemaphore.acquireUninterruptibly();
}
}
}