Added waiting for all operators before calling event listener
git-svn-id: https://hyracks.googlecode.com/svn/trunk@589 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
index 0587722..5e6cf48 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionDemultiplexer.java
@@ -19,7 +19,7 @@
public interface IConnectionDemultiplexer {
public int getSenderCount();
- public IConnectionEntry findNextReadyEntry(int lastReadSender);
+ public IConnectionEntry findNextReadyEntry(int lastReadSender) throws HyracksDataException;
public void unreadyEntry(int index);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 9e3cbbe..2acf8e5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -19,6 +19,8 @@
import java.nio.channels.SelectionKey;
import java.util.UUID;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
public interface IConnectionEntry {
ByteBuffer getReadBuffer();
@@ -32,7 +34,7 @@
void close() throws IOException;
- void write(ByteBuffer buffer);
+ void write(ByteBuffer buffer) throws HyracksDataException;
UUID getJobId();
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 1001161..de53abe 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -63,7 +63,9 @@
private final IWorkspaceFileFactory fileFactory;
private IJobletEventListener jobletEventListener;
-
+
+ private int nPendingOperators;
+
public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
@@ -74,6 +76,7 @@
counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+ nPendingOperators = 0;
}
@Override
@@ -221,4 +224,21 @@
public void setJobletEventListener(IJobletEventListener jobletEventListener) {
this.jobletEventListener = jobletEventListener;
}
+
+ public synchronized void incrementOperatorCount() {
+ ++nPendingOperators;
+ }
+
+ public synchronized void decrementOperatorCount() {
+ --nPendingOperators;
+ if (nPendingOperators <= 0) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForPendingOperators() throws InterruptedException {
+ while (nPendingOperators > 0) {
+ wait();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 90b2a4d..6d610ac 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -438,14 +438,14 @@
si.setEndpointList(null);
}
- private Joblet getLocalJoblet(UUID jobId) throws Exception {
+ private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
Joblet ji = jobletMap.get(jobId);
return ji;
}
private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx, JobPlan plan)
throws Exception {
- synchronized (jobletMap) {
+ synchronized (this) {
Joblet ji = jobletMap.get(jobId);
if (ji == null || ji.getAttempt() != attempt) {
ji = new Joblet(this, jobId, attempt, appCtx);
@@ -470,8 +470,12 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
- Joblet joblet = jobletMap.remove(jobId);
+ Joblet joblet;
+ synchronized (this) {
+ joblet = jobletMap.remove(jobId);
+ }
if (joblet != null) {
+ joblet.waitForPendingOperators();
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
listener.jobletFinish(success);
@@ -483,7 +487,10 @@
@Override
public void startStage(UUID jobId, UUID stageId) throws Exception {
- Joblet ji = jobletMap.get(jobId);
+ Joblet ji;
+ synchronized (this) {
+ ji = jobletMap.get(jobId);
+ }
if (ji != null) {
Stagelet s = ji.getStagelet(stageId);
if (s != null) {
@@ -579,19 +586,25 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
}
- Joblet ji = jobletMap.get(jobId);
+ Joblet ji;
+ synchronized (this) {
+ ji = jobletMap.get(jobId);
+ }
if (ji != null) {
if (ji.getAttempt() == attempt) {
- Joblet joblet = jobletMap.remove(jobId);
- IJobletEventListener listener = joblet.getJobletEventListener();
- if (listener != null) {
- listener.jobletFinish(false);
- }
for (Stagelet stagelet : ji.getStageletMap().values()) {
stagelet.abort();
stagelet.close();
connectionManager.abortConnections(jobId, stagelet.getStageId());
}
+ synchronized (this) {
+ jobletMap.remove(jobId);
+ }
+ ji.waitForPendingOperators();
+ IJobletEventListener listener = ji.getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish(false);
+ }
}
}
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 68b2cad..f1cb62e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -117,30 +117,35 @@
public void installRunnable(final OperatorInstanceId opIId) {
pendingOperators.add(opIId);
final OperatorRunnable hon = honMap.get(opIId);
+ joblet.incrementOperatorCount();
joblet.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
- waitUntilStarted();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
- if (abort) {
- return;
- }
- try {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
- hon.run();
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
- notifyOperatorCompletion(opIId);
- } catch (Exception e) {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
- e.printStackTrace();
- notifyOperatorFailure(opIId);
+ try {
+ waitUntilStarted();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ if (abort) {
+ return;
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": STARTED");
+ hon.run();
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": FINISHED");
+ notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + "(" + hon + ")" + ": ABORTED");
+ e.printStackTrace();
+ notifyOperatorFailure(opIId);
+ }
+ } finally {
+ joblet.decrementOperatorCount();
}
}
});
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
index e49f435..18dd4be 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionEntry.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class ConnectionEntry implements IConnectionEntry {
private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
@@ -111,12 +112,13 @@
}
@Override
- public synchronized void write(ByteBuffer buffer) {
+ public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
while (buffer.remaining() > 0) {
while (writeBuffer.remaining() <= 0) {
try {
wait();
} catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
}
int oldLimit = buffer.limit();
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
index 32c8991..ecaf013 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/DemuxDataReceiveListenerFactory.java
@@ -95,11 +95,12 @@
}
@Override
- public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) {
+ public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) throws HyracksDataException {
while (openSenderCount > 0 && readyBits.isEmpty()) {
try {
wait();
} catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
}
lastReadSender = readyBits.nextSetBit(lastReadSender);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index 8edd992..0362583 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
@@ -31,7 +33,8 @@
private final int nInputs;
private final Executor executor;
private IFrameReader[] readers;
- private volatile boolean abort;
+ private boolean abort;
+ private Set<Thread> opThreads;
public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
this.ctx = ctx;
@@ -39,6 +42,7 @@
this.nInputs = nInputs;
this.executor = executor;
readers = new IFrameReader[nInputs];
+ opThreads = new HashSet<Thread>();
}
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
@@ -49,8 +53,30 @@
this.readers[inputIdx] = reader;
}
- public void abort() {
+ public synchronized void abort() {
+ if (abort) {
+ return;
+ }
abort = true;
+ for (Thread t : opThreads) {
+ t.interrupt();
+ }
+ }
+
+ private synchronized boolean aborted() {
+ return abort;
+ }
+
+ private synchronized void addOperatorThread(Thread t) {
+ if (abort) {
+ t.interrupt();
+ return;
+ }
+ opThreads.add(t);
+ }
+
+ private synchronized void removeOperatorThread(Thread t) {
+ opThreads.add(t);
}
@Override
@@ -66,7 +92,9 @@
executor.execute(new Runnable() {
public void run() {
try {
- pushFrames(reader, writer);
+ if (!aborted()) {
+ pushFrames(reader, writer);
+ }
} catch (HyracksDataException e) {
} finally {
sem.release();
@@ -87,19 +115,27 @@
}
private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- writer.open();
- reader.open();
- while (reader.nextFrame(buffer)) {
- if (abort) {
- break;
+ addOperatorThread(Thread.currentThread());
+ try {
+ ByteBuffer buffer = ctx.allocateFrame();
+ writer.open();
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (aborted()) {
+ break;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
}
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ reader.close();
+ writer.close();
+ } finally {
+ removeOperatorThread(Thread.currentThread());
+ if (Thread.interrupted()) {
+ throw new HyracksDataException("Thread interrupted");
+ }
}
- reader.close();
- writer.close();
}
@Override