Undoing waiting of job cleanup for all operators
git-svn-id: https://hyracks.googlecode.com/svn/trunk@586 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index d0e02e1..1001161 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -64,8 +64,6 @@
private IJobletEventListener jobletEventListener;
- private int nPendingOperators;
-
public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
@@ -76,7 +74,6 @@
counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
- nPendingOperators = 0;
}
@Override
@@ -224,21 +221,4 @@
public void setJobletEventListener(IJobletEventListener jobletEventListener) {
this.jobletEventListener = jobletEventListener;
}
-
- public synchronized void incrementOperatorCount() {
- ++nPendingOperators;
- }
-
- public synchronized void decrementOperatorCount() {
- --nPendingOperators;
- if (nPendingOperators == 0) {
- notifyAll();
- }
- }
-
- public synchronized void waitForPendingOperators() throws InterruptedException {
- while(nPendingOperators > 0) {
- wait();
- }
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6a91730..90b2a4d 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -472,7 +472,6 @@
}
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
- joblet.waitForPendingOperators();
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
listener.jobletFinish(success);
@@ -583,17 +582,16 @@
Joblet ji = jobletMap.get(jobId);
if (ji != null) {
if (ji.getAttempt() == attempt) {
+ Joblet joblet = jobletMap.remove(jobId);
+ IJobletEventListener listener = joblet.getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish(false);
+ }
for (Stagelet stagelet : ji.getStageletMap().values()) {
stagelet.abort();
stagelet.close();
connectionManager.abortConnections(jobId, stagelet.getStageId());
}
- Joblet joblet = jobletMap.remove(jobId);
- joblet.waitForPendingOperators();
- IJobletEventListener listener = joblet.getJobletEventListener();
- if (listener != null) {
- listener.jobletFinish(false);
- }
}
}
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index f1cb62e..68b2cad 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,35 +117,30 @@
public void installRunnable(final OperatorInstanceId opIId) {
pendingOperators.add(opIId);
final OperatorRunnable hon = honMap.get(opIId);
- joblet.incrementOperatorCount();
joblet.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
- try {
- waitUntilStarted();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
- if (abort) {
- return;
- }
- try {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
- hon.run();
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
- notifyOperatorCompletion(opIId);
- } catch (Exception e) {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
- e.printStackTrace();
- notifyOperatorFailure(opIId);
- }
- } finally {
- joblet.decrementOperatorCount();
+ waitUntilStarted();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ if (abort) {
+ return;
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+ hon.run();
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+ notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+ e.printStackTrace();
+ notifyOperatorFailure(opIId);
}
}
});