Undoing waiting of job cleanup for all operators

git-svn-id: https://hyracks.googlecode.com/svn/trunk@586 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index d0e02e1..1001161 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -64,8 +64,6 @@
 
     private IJobletEventListener jobletEventListener;
     
-    private int nPendingOperators;
-
     public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
@@ -76,7 +74,6 @@
         counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
-        nPendingOperators = 0;
     }
 
     @Override
@@ -224,21 +221,4 @@
     public void setJobletEventListener(IJobletEventListener jobletEventListener) {
         this.jobletEventListener = jobletEventListener;
     }
-
-    public synchronized void incrementOperatorCount() {
-        ++nPendingOperators;
-    }
-
-    public synchronized void decrementOperatorCount() {
-        --nPendingOperators;
-        if (nPendingOperators == 0) {
-            notifyAll();
-        }
-    }
-    
-    public synchronized void waitForPendingOperators() throws InterruptedException {
-        while(nPendingOperators > 0) {
-            wait();
-        }
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6a91730..90b2a4d 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -472,7 +472,6 @@
         }
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
-            joblet.waitForPendingOperators();
             IJobletEventListener listener = joblet.getJobletEventListener();
             if (listener != null) {
                 listener.jobletFinish(success);
@@ -583,17 +582,16 @@
         Joblet ji = jobletMap.get(jobId);
         if (ji != null) {
             if (ji.getAttempt() == attempt) {
+                Joblet joblet = jobletMap.remove(jobId);
+                IJobletEventListener listener = joblet.getJobletEventListener();
+                if (listener != null) {
+                    listener.jobletFinish(false);
+                }
                 for (Stagelet stagelet : ji.getStageletMap().values()) {
                     stagelet.abort();
                     stagelet.close();
                     connectionManager.abortConnections(jobId, stagelet.getStageId());
                 }
-                Joblet joblet = jobletMap.remove(jobId);
-                joblet.waitForPendingOperators();
-                IJobletEventListener listener = joblet.getJobletEventListener();
-                if (listener != null) {
-                    listener.jobletFinish(false);
-                }
             }
         }
     }
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index f1cb62e..68b2cad 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,35 +117,30 @@
     public void installRunnable(final OperatorInstanceId opIId) {
         pendingOperators.add(opIId);
         final OperatorRunnable hon = honMap.get(opIId);
-        joblet.incrementOperatorCount();
         joblet.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 try {
-                    try {
-                        waitUntilStarted();
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                        return;
-                    }
-                    if (abort) {
-                        return;
-                    }
-                    try {
-                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                                + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
-                        hon.run();
-                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                                + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
-                        notifyOperatorCompletion(opIId);
-                    } catch (Exception e) {
-                        LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                                + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
-                        e.printStackTrace();
-                        notifyOperatorFailure(opIId);
-                    }
-                } finally {
-                    joblet.decrementOperatorCount();
+                    waitUntilStarted();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    return;
+                }
+                if (abort) {
+                    return;
+                }
+                try {
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+                    hon.run();
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+                    notifyOperatorCompletion(opIId);
+                } catch (Exception e) {
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+                    e.printStackTrace();
+                    notifyOperatorFailure(opIId);
                 }
             }
         });