Made Joblet Cleanup wait for all pending operators to stop before notifying event listener
git-svn-id: https://hyracks.googlecode.com/svn/trunk@584 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index bb112ed..d0e02e1 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -63,6 +63,8 @@
private final IWorkspaceFileFactory fileFactory;
private IJobletEventListener jobletEventListener;
+
+ private int nPendingOperators;
public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
this.nodeController = nodeController;
@@ -74,6 +76,7 @@
counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+ nPendingOperators = 0;
}
@Override
@@ -221,4 +224,21 @@
public void setJobletEventListener(IJobletEventListener jobletEventListener) {
this.jobletEventListener = jobletEventListener;
}
+
+ public synchronized void incrementOperatorCount() {
+ ++nPendingOperators;
+ }
+
+ public synchronized void decrementOperatorCount() {
+ --nPendingOperators;
+ if (nPendingOperators == 0) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForPendingOperators() throws InterruptedException {
+ while(nPendingOperators > 0) {
+ wait();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 25bf741..239fee1 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -472,6 +472,7 @@
}
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
+ joblet.waitForPendingOperators();
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
listener.jobletFinish(success);
@@ -583,6 +584,7 @@
if (ji != null) {
if (ji.getAttempt() == attempt) {
Joblet joblet = jobletMap.remove(jobId);
+ joblet.waitForPendingOperators();
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
listener.jobletFinish(false);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 68b2cad..f1cb62e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,30 +117,35 @@
public void installRunnable(final OperatorInstanceId opIId) {
pendingOperators.add(opIId);
final OperatorRunnable hon = honMap.get(opIId);
+ joblet.incrementOperatorCount();
joblet.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
- waitUntilStarted();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
- if (abort) {
- return;
- }
- try {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
- hon.run();
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
- notifyOperatorCompletion(opIId);
- } catch (Exception e) {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
- e.printStackTrace();
- notifyOperatorFailure(opIId);
+ try {
+ waitUntilStarted();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ if (abort) {
+ return;
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+ hon.run();
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+ notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+ e.printStackTrace();
+ notifyOperatorFailure(opIId);
+ }
+ } finally {
+ joblet.decrementOperatorCount();
}
}
});