Made Joblet Cleanup wait for all pending operators to stop before notifying event listener

git-svn-id: https://hyracks.googlecode.com/svn/trunk@584 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index bb112ed..d0e02e1 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -63,6 +63,8 @@
     private final IWorkspaceFileFactory fileFactory;
 
     private IJobletEventListener jobletEventListener;
+    
+    private int nPendingOperators;
 
     public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
@@ -74,6 +76,7 @@
         counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+        nPendingOperators = 0;
     }
 
     @Override
@@ -221,4 +224,21 @@
     public void setJobletEventListener(IJobletEventListener jobletEventListener) {
         this.jobletEventListener = jobletEventListener;
     }
+
+    public synchronized void incrementOperatorCount() {
+        ++nPendingOperators;
+    }
+
+    public synchronized void decrementOperatorCount() {
+        --nPendingOperators;
+        if (nPendingOperators == 0) {
+            notifyAll();
+        }
+    }
+    
+    public synchronized void waitForPendingOperators() throws InterruptedException {
+        while(nPendingOperators > 0) {
+            wait();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 25bf741..239fee1 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -472,6 +472,7 @@
         }
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
+            joblet.waitForPendingOperators();
             IJobletEventListener listener = joblet.getJobletEventListener();
             if (listener != null) {
                 listener.jobletFinish(success);
@@ -583,6 +584,7 @@
         if (ji != null) {
             if (ji.getAttempt() == attempt) {
                 Joblet joblet = jobletMap.remove(jobId);
+                joblet.waitForPendingOperators();
                 IJobletEventListener listener = joblet.getJobletEventListener();
                 if (listener != null) {
                     listener.jobletFinish(false);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 68b2cad..f1cb62e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,30 +117,35 @@
     public void installRunnable(final OperatorInstanceId opIId) {
         pendingOperators.add(opIId);
         final OperatorRunnable hon = honMap.get(opIId);
+        joblet.incrementOperatorCount();
         joblet.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 try {
-                    waitUntilStarted();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                    return;
-                }
-                if (abort) {
-                    return;
-                }
-                try {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
-                    hon.run();
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
-                    notifyOperatorCompletion(opIId);
-                } catch (Exception e) {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
-                    e.printStackTrace();
-                    notifyOperatorFailure(opIId);
+                    try {
+                        waitUntilStarted();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        return;
+                    }
+                    if (abort) {
+                        return;
+                    }
+                    try {
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+                        hon.run();
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+                        notifyOperatorCompletion(opIId);
+                    } catch (Exception e) {
+                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                                + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+                        e.printStackTrace();
+                        notifyOperatorFailure(opIId);
+                    }
+                } finally {
+                    joblet.decrementOperatorCount();
                 }
             }
         });