Fixed lost interrupts
git-svn-id: https://hyracks.googlecode.com/svn/trunk@624 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
index ecaf013..f2f3a38 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
@@ -100,6 +100,7 @@
try {
wait();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new HyracksDataException(e);
}
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 0362583..98c660f 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -70,72 +70,70 @@
private synchronized void addOperatorThread(Thread t) {
if (abort) {
t.interrupt();
- return;
}
opThreads.add(t);
}
private synchronized void removeOperatorThread(Thread t) {
- opThreads.add(t);
+ opThreads.remove(t);
}
@Override
public void run() {
+ addOperatorThread(Thread.currentThread());
try {
- opNode.initialize();
- if (nInputs > 0) {
- final Semaphore sem = new Semaphore(nInputs - 1);
- for (int i = 1; i < nInputs; ++i) {
- final IFrameReader reader = readers[i];
- final IFrameWriter writer = opNode.getInputFrameWriter(i);
- sem.acquire();
- executor.execute(new Runnable() {
- public void run() {
- try {
- if (!aborted()) {
- pushFrames(reader, writer);
+ try {
+ opNode.initialize();
+ if (nInputs > 0) {
+ final Semaphore sem = new Semaphore(nInputs - 1);
+ for (int i = 1; i < nInputs; ++i) {
+ final IFrameReader reader = readers[i];
+ final IFrameWriter writer = opNode.getInputFrameWriter(i);
+ sem.acquire();
+ executor.execute(new Runnable() {
+ public void run() {
+ addOperatorThread(Thread.currentThread());
+ try {
+ if (!aborted()) {
+ pushFrames(reader, writer);
+ }
+ } catch (HyracksDataException e) {
+ } finally {
+ sem.release();
+ removeOperatorThread(Thread.currentThread());
}
- } catch (HyracksDataException e) {
- } finally {
- sem.release();
}
- }
- });
+ });
+ }
+ try {
+ pushFrames(readers[0], opNode.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(nInputs - 1);
+ }
}
- try {
- pushFrames(readers[0], opNode.getInputFrameWriter(0));
- } finally {
- sem.acquire(nInputs - 1);
- }
+ opNode.deinitialize();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- opNode.deinitialize();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } finally {
+ removeOperatorThread(Thread.currentThread());
}
}
private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
- addOperatorThread(Thread.currentThread());
- try {
- ByteBuffer buffer = ctx.allocateFrame();
- writer.open();
- reader.open();
- while (reader.nextFrame(buffer)) {
- if (aborted()) {
- break;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ ByteBuffer buffer = ctx.allocateFrame();
+ writer.open();
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (aborted()) {
+ break;
}
- reader.close();
- writer.close();
- } finally {
- removeOperatorThread(Thread.currentThread());
- if (Thread.interrupted()) {
- throw new HyracksDataException("Thread interrupted");
- }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
}
+ reader.close();
+ writer.close();
}
@Override