Fixed lost interrupts

git-svn-id: https://hyracks.googlecode.com/svn/trunk@624 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
index ecaf013..f2f3a38 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
@@ -100,6 +100,7 @@
             try {
                 wait();
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 throw new HyracksDataException(e);
             }
         }
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 0362583..98c660f 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -70,72 +70,70 @@
     private synchronized void addOperatorThread(Thread t) {
         if (abort) {
             t.interrupt();
-            return;
         }
         opThreads.add(t);
     }
 
     private synchronized void removeOperatorThread(Thread t) {
-        opThreads.add(t);
+        opThreads.remove(t);
     }
 
     @Override
     public void run() {
+        addOperatorThread(Thread.currentThread());
         try {
-            opNode.initialize();
-            if (nInputs > 0) {
-                final Semaphore sem = new Semaphore(nInputs - 1);
-                for (int i = 1; i < nInputs; ++i) {
-                    final IFrameReader reader = readers[i];
-                    final IFrameWriter writer = opNode.getInputFrameWriter(i);
-                    sem.acquire();
-                    executor.execute(new Runnable() {
-                        public void run() {
-                            try {
-                                if (!aborted()) {
-                                    pushFrames(reader, writer);
+            try {
+                opNode.initialize();
+                if (nInputs > 0) {
+                    final Semaphore sem = new Semaphore(nInputs - 1);
+                    for (int i = 1; i < nInputs; ++i) {
+                        final IFrameReader reader = readers[i];
+                        final IFrameWriter writer = opNode.getInputFrameWriter(i);
+                        sem.acquire();
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                addOperatorThread(Thread.currentThread());
+                                try {
+                                    if (!aborted()) {
+                                        pushFrames(reader, writer);
+                                    }
+                                } catch (HyracksDataException e) {
+                                } finally {
+                                    sem.release();
+                                    removeOperatorThread(Thread.currentThread());
                                 }
-                            } catch (HyracksDataException e) {
-                            } finally {
-                                sem.release();
                             }
-                        }
-                    });
+                        });
+                    }
+                    try {
+                        pushFrames(readers[0], opNode.getInputFrameWriter(0));
+                    } finally {
+                        sem.acquire(nInputs - 1);
+                    }
                 }
-                try {
-                    pushFrames(readers[0], opNode.getInputFrameWriter(0));
-                } finally {
-                    sem.acquire(nInputs - 1);
-                }
+                opNode.deinitialize();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
-            opNode.deinitialize();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        } finally {
+            removeOperatorThread(Thread.currentThread());
         }
     }
 
     private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
-        addOperatorThread(Thread.currentThread());
-        try {
-            ByteBuffer buffer = ctx.allocateFrame();
-            writer.open();
-            reader.open();
-            while (reader.nextFrame(buffer)) {
-                if (aborted()) {
-                    break;
-                }
-                buffer.flip();
-                writer.nextFrame(buffer);
-                buffer.compact();
+        ByteBuffer buffer = ctx.allocateFrame();
+        writer.open();
+        reader.open();
+        while (reader.nextFrame(buffer)) {
+            if (aborted()) {
+                break;
             }
-            reader.close();
-            writer.close();
-        } finally {
-            removeOperatorThread(Thread.currentThread());
-            if (Thread.interrupted()) {
-                throw new HyracksDataException("Thread interrupted");
-            }
+            buffer.flip();
+            writer.nextFrame(buffer);
+            buffer.compact();
         }
+        reader.close();
+        writer.close();
     }
 
     @Override