Added waiting for all operators before calling event listener

git-svn-id: https://hyracks.googlecode.com/svn/trunk@589 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
index 0587722..5e6cf48 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
@@ -19,7 +19,7 @@
 public interface IConnectionDemultiplexer {
     public int getSenderCount();
 
-    public IConnectionEntry findNextReadyEntry(int lastReadSender);
+    public IConnectionEntry findNextReadyEntry(int lastReadSender) throws HyracksDataException;
 
     public void unreadyEntry(int index);
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 9e3cbbe..2acf8e5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -19,6 +19,8 @@
 import java.nio.channels.SelectionKey;
 import java.util.UUID;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 public interface IConnectionEntry {
     ByteBuffer getReadBuffer();
 
@@ -32,7 +34,7 @@
 
     void close() throws IOException;
 
-    void write(ByteBuffer buffer);
+    void write(ByteBuffer buffer) throws HyracksDataException;
 
     UUID getJobId();
 
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 1001161..de53abe 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -63,7 +63,9 @@
     private final IWorkspaceFileFactory fileFactory;
 
     private IJobletEventListener jobletEventListener;
-    
+
+    private int nPendingOperators;
+
     public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
@@ -74,6 +76,7 @@
         counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+        nPendingOperators = 0;
     }
 
     @Override
@@ -221,4 +224,21 @@
     public void setJobletEventListener(IJobletEventListener jobletEventListener) {
         this.jobletEventListener = jobletEventListener;
     }
+
+    public synchronized void incrementOperatorCount() {
+        ++nPendingOperators;
+    }
+
+    public synchronized void decrementOperatorCount() {
+        --nPendingOperators;
+        if (nPendingOperators <= 0) {
+            notifyAll();
+        }
+    }
+
+    public synchronized void waitForPendingOperators() throws InterruptedException {
+        while (nPendingOperators > 0) {
+            wait();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 90b2a4d..6d610ac 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -438,14 +438,14 @@
         si.setEndpointList(null);
     }
 
-    private Joblet getLocalJoblet(UUID jobId) throws Exception {
+    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
         Joblet ji = jobletMap.get(jobId);
         return ji;
     }
 
     private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx, JobPlan plan)
             throws Exception {
-        synchronized (jobletMap) {
+        synchronized (this) {
             Joblet ji = jobletMap.get(jobId);
             if (ji == null || ji.getAttempt() != attempt) {
                 ji = new Joblet(this, jobId, attempt, appCtx);
@@ -470,8 +470,12 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
-        Joblet joblet = jobletMap.remove(jobId);
+        Joblet joblet;
+        synchronized (this) {
+            joblet = jobletMap.remove(jobId);
+        }
         if (joblet != null) {
+            joblet.waitForPendingOperators();
             IJobletEventListener listener = joblet.getJobletEventListener();
             if (listener != null) {
                 listener.jobletFinish(success);
@@ -483,7 +487,10 @@
 
     @Override
     public void startStage(UUID jobId, UUID stageId) throws Exception {
-        Joblet ji = jobletMap.get(jobId);
+        Joblet ji;
+        synchronized (this) {
+            ji = jobletMap.get(jobId);
+        }
         if (ji != null) {
             Stagelet s = ji.getStagelet(stageId);
             if (s != null) {
@@ -579,19 +586,25 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
         }
-        Joblet ji = jobletMap.get(jobId);
+        Joblet ji;
+        synchronized (this) {
+            ji = jobletMap.get(jobId);
+        }
         if (ji != null) {
             if (ji.getAttempt() == attempt) {
-                Joblet joblet = jobletMap.remove(jobId);
-                IJobletEventListener listener = joblet.getJobletEventListener();
-                if (listener != null) {
-                    listener.jobletFinish(false);
-                }
                 for (Stagelet stagelet : ji.getStageletMap().values()) {
                     stagelet.abort();
                     stagelet.close();
                     connectionManager.abortConnections(jobId, stagelet.getStageId());
                 }
+                synchronized (this) {
+                    jobletMap.remove(jobId);
+                }
+                ji.waitForPendingOperators();
+                IJobletEventListener listener = ji.getJobletEventListener();
+                if (listener != null) {
+                    listener.jobletFinish(false);
+                }
             }
         }
     }
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 68b2cad..f1cb62e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,30 +117,35 @@
     public void installRunnable(final OperatorInstanceId opIId) {
         pendingOperators.add(opIId);
         final OperatorRunnable hon = honMap.get(opIId);
+        joblet.incrementOperatorCount();
         joblet.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 try {
-                    waitUntilStarted();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                    return;
-                }
-                if (abort) {
-                    return;
-                }
-                try {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
-                    hon.run();
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
-                    notifyOperatorCompletion(opIId);
-                } catch (Exception e) {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
-                    e.printStackTrace();
-                    notifyOperatorFailure(opIId);
+                    try {
+                        waitUntilStarted();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        return;
+                    }
+                    if (abort) {
+                        return;
+                    }
+                    try {
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+                        hon.run();
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+                        notifyOperatorCompletion(opIId);
+                    } catch (Exception e) {
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+                        e.printStackTrace();
+                        notifyOperatorFailure(opIId);
+                    }
+                } finally {
+                    joblet.decrementOperatorCount();
                 }
             }
         });
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
index e49f435..18dd4be 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class ConnectionEntry implements IConnectionEntry {
     private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
@@ -111,12 +112,13 @@
     }
 
     @Override
-    public synchronized void write(ByteBuffer buffer) {
+    public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
         while (buffer.remaining() > 0) {
             while (writeBuffer.remaining() <= 0) {
                 try {
                     wait();
                 } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
                 }
             }
             int oldLimit = buffer.limit();
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
index 32c8991..ecaf013 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
@@ -95,11 +95,12 @@
     }
 
     @Override
-    public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) {
+    public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) throws HyracksDataException {
         while (openSenderCount > 0 && readyBits.isEmpty()) {
             try {
                 wait();
             } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
             }
         }
         lastReadSender = readyBits.nextSetBit(lastReadSender);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 8edd992..0362583 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 
@@ -31,7 +33,8 @@
     private final int nInputs;
     private final Executor executor;
     private IFrameReader[] readers;
-    private volatile boolean abort;
+    private boolean abort;
+    private Set<Thread> opThreads;
 
     public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
         this.ctx = ctx;
@@ -39,6 +42,7 @@
         this.nInputs = nInputs;
         this.executor = executor;
         readers = new IFrameReader[nInputs];
+        opThreads = new HashSet<Thread>();
     }
 
     public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
@@ -49,8 +53,30 @@
         this.readers[inputIdx] = reader;
     }
 
-    public void abort() {
+    public synchronized void abort() {
+        if (abort) {
+            return;
+        }
         abort = true;
+        for (Thread t : opThreads) {
+            t.interrupt();
+        }
+    }
+
+    private synchronized boolean aborted() {
+        return abort;
+    }
+
+    private synchronized void addOperatorThread(Thread t) {
+        if (abort) {
+            t.interrupt();
+            return;
+        }
+        opThreads.add(t);
+    }
+
+    private synchronized void removeOperatorThread(Thread t) {
+        opThreads.add(t);
     }
 
     @Override
@@ -66,7 +92,9 @@
                     executor.execute(new Runnable() {
                         public void run() {
                             try {
-                                pushFrames(reader, writer);
+                                if (!aborted()) {
+                                    pushFrames(reader, writer);
+                                }
                             } catch (HyracksDataException e) {
                             } finally {
                                 sem.release();
@@ -87,19 +115,27 @@
     }
 
     private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
-        ByteBuffer buffer = ctx.allocateFrame();
-        writer.open();
-        reader.open();
-        while (reader.nextFrame(buffer)) {
-            if (abort) {
-                break;
+        addOperatorThread(Thread.currentThread());
+        try {
+            ByteBuffer buffer = ctx.allocateFrame();
+            writer.open();
+            reader.open();
+            while (reader.nextFrame(buffer)) {
+                if (aborted()) {
+                    break;
+                }
+                buffer.flip();
+                writer.nextFrame(buffer);
+                buffer.compact();
             }
-            buffer.flip();
-            writer.nextFrame(buffer);
-            buffer.compact();
+            reader.close();
+            writer.close();
+        } finally {
+            removeOperatorThread(Thread.currentThread());
+            if (Thread.interrupted()) {
+                throw new HyracksDataException("Thread interrupted");
+            }
         }
-        reader.close();
-        writer.close();
     }
 
     @Override