Added code for fault tolerance
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@19 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 125c1aa..9e3cbbe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.util.UUID;
public interface IConnectionEntry {
ByteBuffer getReadBuffer();
@@ -32,4 +33,16 @@
void close() throws IOException;
void write(ByteBuffer buffer);
+
+ UUID getJobId();
+
+ UUID getStageId();
+
+ void setJobId(UUID jobId);
+
+ void setStageId(UUID stageId);
+
+ boolean aborted();
+
+ void abort();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
index 91f921c..7e74aee 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
@@ -18,4 +18,8 @@
public interface IDataReceiveListenerFactory {
public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex);
+
+ public UUID getJobId();
+
+ public UUID getStageId();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index 19e81f1..81b16d9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -30,8 +30,8 @@
public void unregisterNode(INodeController nodeController) throws Exception;
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception;
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception;
public void nodeHeartbeat(String id) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index e7152f9..d349edf 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -32,7 +32,7 @@
public NodeCapability getNodeCapability() throws Exception;
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
Set<ActivityNodeId> activities) throws Exception;
public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
@@ -40,6 +40,8 @@
public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
+ public void abortJoblet(UUID jobId, UUID stageId) throws Exception;
+
public void cleanUpJob(UUID jobId) throws Exception;
public void startStage(UUID jobId, UUID stageId) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 4d9c309..89f7566 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -18,10 +18,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.comm.FrameConstants;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
import edu.uci.ics.hyracks.context.HyracksContext;
@@ -41,6 +41,12 @@
private final SelectionKey key;
+ private UUID jobId;
+
+ private UUID stageId;
+
+ private boolean aborted;
+
public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
this.socketChannel = socketChannel;
readBuffer = ctx.getResourceManager().allocateFrame();
@@ -55,6 +61,9 @@
}
public boolean dispatch(SelectionKey key) throws IOException {
+ if (aborted) {
+ recvListener.dataReceived(this);
+ }
if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
@@ -135,12 +144,46 @@
}
@Override
- public void close() throws IOException {
- socketChannel.close();
+ public void close() {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
@Override
public SelectionKey getSelectionKey() {
return key;
}
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void setJobId(UUID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
+
+ @Override
+ public void setStageId(UUID stageId) {
+ this.stageId = stageId;
+ }
+
+ @Override
+ public void abort() {
+ aborted = true;
+ }
+
+ @Override
+ public boolean aborted() {
+ return aborted;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index bfca6ad..464a95c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -26,9 +26,11 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -62,6 +64,8 @@
private final IDataReceiveListener initialDataReceiveListener;
+ private final Set<IConnectionEntry> connections;
+
private volatile boolean stopped;
private ByteBuffer emptyFrame;
@@ -76,7 +80,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
- + serverSocket.getLocalPort());
+ + serverSocket.getLocalPort());
}
pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
@@ -85,6 +89,7 @@
initialDataReceiveListener = new InitialDataReceiveListener();
emptyFrame = ctx.getResourceManager().allocateFrame();
emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+ connections = new HashSet<IConnectionEntry>();
}
public synchronized void dumpStats() {
@@ -116,7 +121,7 @@
public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
try {
SocketChannel channel = SocketChannel
- .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+ .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
buffer.clear();
@@ -173,6 +178,20 @@
pendingConnectionReceivers.remove(id);
}
+ public synchronized void abortConnections(UUID jobId, UUID stageId) {
+ List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+ synchronized (this) {
+ for (IConnectionEntry ce : connections) {
+ if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+ abortConnections.add(ce);
+ }
+ }
+ }
+ synchronized (dataListenerThread) {
+ dataListenerThread.pendingAbortConnections.addAll(abortConnections);
+ }
+ }
+
private final class NetworkFrameWriter implements IFrameWriter {
private SocketChannel channel;
@@ -237,7 +256,8 @@
private final class DataListenerThread extends Thread {
private Selector selector;
- private List<SocketChannel> pendingSockets;
+ private List<SocketChannel> pendingNewSockets;
+ private List<IConnectionEntry> pendingAbortConnections;
public DataListenerThread() {
super("Hyracks Data Listener Thread");
@@ -246,12 +266,13 @@
} catch (IOException e) {
throw new RuntimeException(e);
}
- pendingSockets = new ArrayList<SocketChannel>();
+ pendingNewSockets = new ArrayList<SocketChannel>();
+ pendingAbortConnections = new ArrayList<IConnectionEntry>();
}
synchronized void addSocketChannel(SocketChannel sc) throws IOException {
LOGGER.info("Connection received");
- pendingSockets.add(sc);
+ pendingNewSockets.add(sc);
selector.wakeup();
}
@@ -264,8 +285,8 @@
}
int n = selector.select();
synchronized (this) {
- if (!pendingSockets.isEmpty()) {
- for (SocketChannel sc : pendingSockets) {
+ if (!pendingNewSockets.isEmpty()) {
+ for (SocketChannel sc : pendingNewSockets) {
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
@@ -275,7 +296,19 @@
LOGGER.fine("Woke up selector");
}
}
- pendingSockets.clear();
+ pendingNewSockets.clear();
+ }
+ if (!pendingAbortConnections.isEmpty()) {
+ for (IConnectionEntry ce : pendingAbortConnections) {
+ SelectionKey key = ce.getSelectionKey();
+ ce.abort();
+ ((ConnectionEntry) ce).dispatch(key);
+ key.cancel();
+ ce.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(ce);
+ }
+ }
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Selector: " + n);
@@ -295,6 +328,9 @@
if (close) {
key.cancel();
entry.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(entry);
+ }
}
}
}
@@ -331,6 +367,11 @@
newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
entry.setDataReceiveListener(newListener);
+ entry.setJobId(connectionReceiver.getJobId());
+ entry.setStageId(connectionReceiver.getStageId());
+ synchronized (ConnectionManager.this) {
+ connections.add(entry);
+ }
byte[] ack = new byte[4];
ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
ackBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 3eb2280..2cfcb94 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -32,7 +32,7 @@
import edu.uci.ics.hyracks.context.HyracksContext;
public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
- IDataReceiveListener {
+ IDataReceiveListener {
private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
private final NonDeterministicFrameReader frameReader;
@@ -40,10 +40,14 @@
private final BitSet readyBits;
private IConnectionEntry senders[];
private int openSenderCount;
+ private UUID jobId;
+ private UUID stageId;
- public DemuxDataReceiveListenerFactory(HyracksContext ctx) {
+ public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
frameReader = new NonDeterministicFrameReader(ctx, this);
this.ctx = ctx;
+ this.jobId = jobId;
+ this.stageId = stageId;
readyBits = new BitSet();
senders = null;
openSenderCount = 0;
@@ -66,7 +70,7 @@
ByteBuffer buffer = entry.getReadBuffer();
buffer.flip();
int dataLen = buffer.remaining();
- if (dataLen >= ctx.getFrameSize()) {
+ if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
}
@@ -139,4 +143,14 @@
public synchronized int getSenderCount() {
return senders.length;
}
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 3f2d449..91350735 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -55,6 +55,10 @@
}
while (true) {
IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+ if (entry.aborted()) {
+ eos = true;
+ return false;
+ }
lastReadSender = (Integer) entry.getAttachment();
ByteBuffer netBuffer = entry.getReadBuffer();
int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 4513ca9..4bbd96a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -178,9 +178,9 @@
}
@Override
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception {
- jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
}
@Override
@@ -255,7 +255,7 @@
}
}
- private void killNode(String nodeId) {
+ private void killNode(String nodeId) throws Exception {
nodeRegistry.remove(nodeId);
jobManager.notifyNodeFailure(nodeId);
}
@@ -272,7 +272,11 @@
}
}
for (String deadNode : deadNodes) {
- killNode(deadNode);
+ try {
+ killNode(deadNode);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
}
@@ -329,19 +333,22 @@
private UUID jobId;
private JobPlan plan;
private UUID stageId;
+ private int attempt;
private Set<ActivityNodeId> tasks;
- public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks) {
+ public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ Set<ActivityNodeId> tasks) {
this.nodeId = nodeId;
this.jobId = jobId;
this.plan = plan;
this.stageId = stageId;
+ this.attempt = attempt;
this.tasks = tasks;
}
@Override
public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
- return node.initializeJobletPhase1(jobId, plan, stageId, tasks);
+ return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks);
}
@Override
@@ -446,6 +453,34 @@
}
}
+ static class JobletAborter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.abortJoblet(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Aborting";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
static class PortMapMergingAccumulator implements
Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
index 7892b5c..a13dc7e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -30,12 +30,12 @@
public void advanceJob(JobControl jobControlImpl) throws Exception;
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception;
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception;
public JobStatus getJobStatus(UUID jobId);
public JobStatistics waitForCompletion(UUID jobId) throws Exception;
- public void notifyNodeFailure(String nodeId);
+ public void notifyNodeFailure(String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 3709753..8917959 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -56,8 +56,6 @@
private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
- private final ClusterControllerService ccs;
-
private final Runtime jolRuntime;
private final JobTable jobTable;
@@ -80,8 +78,13 @@
private final StageletCompleteTable stageletCompleteTable;
- public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
- this.ccs = ccs;
+ private final FailedNodesTable failedNodesTable;
+
+ private final AbortMessageTable abortMessageTable;
+
+ private final AbortNotifyTable abortNotifyTable;
+
+ public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
this.jolRuntime = jolRuntime;
this.jobTable = new JobTable(jolRuntime);
this.odTable = new OperatorDescriptorTable(jolRuntime);
@@ -93,6 +96,9 @@
this.jobStartTable = new JobStartTable();
this.startMessageTable = new StartMessageTable(jolRuntime);
this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+ this.failedNodesTable = new FailedNodesTable(jolRuntime);
+ this.abortMessageTable = new AbortMessageTable(jolRuntime);
+ this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
jolRuntime.catalog().register(jobTable);
jolRuntime.catalog().register(odTable);
@@ -104,6 +110,9 @@
jolRuntime.catalog().register(jobStartTable);
jolRuntime.catalog().register(startMessageTable);
jolRuntime.catalog().register(stageletCompleteTable);
+ jolRuntime.catalog().register(failedNodesTable);
+ jolRuntime.catalog().register(abortMessageTable);
+ jolRuntime.catalog().register(abortNotifyTable);
jobTable.register(new JobTable.Callback() {
@Override
@@ -123,6 +132,7 @@
}
+ @SuppressWarnings("unchecked")
@Override
public void insertion(TupleSet tuples) {
try {
@@ -131,15 +141,16 @@
Object[] data = t.toArray();
UUID jobId = (UUID) data[0];
UUID stageId = (UUID) data[1];
- JobPlan plan = (JobPlan) data[2];
- Set<List> ts = (Set<List>) data[3];
+ Integer attempt = (Integer) data[2];
+ JobPlan plan = (JobPlan) data[3];
+ Set<List> ts = (Set<List>) data[4];
ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
.size()];
int i = 0;
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
- plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
+ plan, stageId, attempt, (Set<ActivityNodeId>) t2Data[1]);
}
Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
new ClusterControllerService.PortMapMergingAccumulator());
@@ -170,6 +181,44 @@
}
});
+ abortMessageTable.register(new AbortMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ try {
+ synchronized (JOLJobManagerImpl.this) {
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ Set<List> ts = (Set<List>) data[4];
+ ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+ .size()];
+ int i = 0;
+ BasicTupleSet notificationTuples = new BasicTupleSet();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ String nodeId = (String) t2Data[0];
+ jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
+ notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
+ }
+ ccs.runRemote(jas, null);
+
+ jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
jolRuntime.evaluate();
}
@@ -270,15 +319,19 @@
}
@Override
- public void notifyNodeFailure(String nodeId) {
+ public void notifyNodeFailure(String nodeId) throws Exception {
+ BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+ jolRuntime.evaluate();
}
@Override
- public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+ public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
StageletStatistics statistics) throws Exception {
BasicTupleSet scTuples = new BasicTupleSet();
- scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, statistics));
+ scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
@@ -314,6 +367,7 @@
private static Key PRIMARY_KEY = new Key(0);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
};
@@ -322,10 +376,12 @@
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
+ @SuppressWarnings("unchecked")
static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
}
+ @SuppressWarnings("unchecked")
JobStatistics buildJobStatistics(Tuple jobTuple) {
Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
JobStatistics stats = new JobStatistics();
@@ -358,6 +414,7 @@
private static Key PRIMARY_KEY = new Key(0, 1);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
};
@@ -379,6 +436,7 @@
private static Key PRIMARY_KEY = new Key();
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, OperatorDescriptorId.class, String.class
};
@@ -400,6 +458,7 @@
private static Key PRIMARY_KEY = new Key(0, 1);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class,
ConnectorDescriptorId.class,
@@ -433,6 +492,7 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
};
@@ -455,6 +515,7 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
};
@@ -477,6 +538,7 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class,
OperatorDescriptorId.class,
@@ -504,6 +566,7 @@
private static class JobStartTable extends EventTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
UUID.class, Long.class
};
@@ -523,10 +586,11 @@
private static class StartMessageTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
- private static Key PRIMARY_KEY = new Key(0, 1);
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, JobPlan.class, Set.class
+ UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
};
public StartMessageTable(Runtime context) {
@@ -535,23 +599,87 @@
}
/*
- * declare(stageletcomplete, keys(0, 1, 2), {JobId, StageId, NodeId, StageletStatistics})
+ * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
*/
private static class StageletCompleteTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
- private static Key PRIMARY_KEY = new Key(0, 1, 2);
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+ @SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, String.class, StageletStatistics.class
+ UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
};
public StageletCompleteTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
- public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics) {
- return new Tuple(jobId, stageId, nodeId, statistics);
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+ StageletStatistics statistics) {
+ return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+ }
+ }
+
+ /*
+ * declare(failednodes, keys(0), {NodeId})
+ */
+ private static class FailedNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class
+ };
+
+ public FailedNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
+ * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+ */
+ private static class AbortMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+ };
+
+ public AbortMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ */
+ private static class AbortNotifyTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, Integer.class
+ };
+
+ public AbortNotifyTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
}
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index 11a7bdd..fc902b2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -151,7 +151,7 @@
.size()];
int i = 0;
for (String nodeId : participatingNodes) {
- p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, stage.getTasks());
+ p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, 0, stage.getTasks());
}
Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
new ClusterControllerService.PortMapMergingAccumulator());
@@ -179,7 +179,8 @@
return participatingNodes;
}
- public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+ @Override
+ public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
StageletStatistics statistics) throws Exception {
JobControl jc = jobMap.get(jobId);
if (jc != null) {
@@ -187,11 +188,13 @@
}
}
+ @Override
public synchronized JobStatus getJobStatus(UUID jobId) {
JobControl jc = jobMap.get(jobId);
return jc.getJobStatus();
}
+ @Override
public JobStatistics waitForCompletion(UUID jobId) throws Exception {
JobControl jc;
synchronized (this) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 055216d..2269e1f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -87,8 +87,8 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, StageletStatistics stats) throws Exception {
+ public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
stageletMap.remove(stageId);
- nodeController.notifyStageComplete(jobId, stageId, stats);
+ nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index cde2ec4..7b595b7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -161,13 +161,13 @@
}
@Override
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
Set<ActivityNodeId> activities) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
final Joblet joblet = getLocalJoblet(jobId);
- Stagelet stagelet = new Stagelet(joblet, stageId, id);
+ Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
joblet.setStagelet(stageId, stagelet);
final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@@ -197,7 +197,8 @@
IConnectorDescriptor conn = inputs.get(j);
Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
endpointList.add(endpoint);
- DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+ stageId);
connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
.getTaskInputMap().get(hanId).get(j), i);
@@ -368,8 +369,8 @@
}
}
- public void notifyStageComplete(UUID jobId, UUID stageId, StageletStatistics stats) throws Exception {
- ccs.notifyStageletComplete(jobId, stageId, id, stats);
+ public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+ ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
}
@Override
@@ -398,4 +399,16 @@
}
}
}
+
+ @Override
+ public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Stagelet stagelet = ji.getStagelet(stageId);
+ if (stagelet != null) {
+ stagelet.abort();
+ connectionManager.abortConnections(jobId, stageId);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 8acab28..8c0b830 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -40,19 +40,24 @@
private final UUID stageId;
+ private final int attempt;
+
private final Map<OperatorInstanceId, OperatorRunnable> honMap;
private List<Endpoint> endpointList;
private boolean started;
+ private volatile boolean abort;
+
private final Set<OperatorInstanceId> pendingOperators;
private final StageletStatistics stats;
- public Stagelet(Joblet joblet, UUID stageId, String nodeId) throws RemoteException {
+ public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
this.joblet = joblet;
this.stageId = stageId;
+ this.attempt = attempt;
pendingOperators = new HashSet<OperatorInstanceId>();
started = false;
honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
@@ -85,6 +90,13 @@
notifyAll();
}
+ public synchronized void abort() {
+ this.abort = true;
+ for (OperatorRunnable r : honMap.values()) {
+ r.abort();
+ }
+ }
+
public void installRunnable(final OperatorInstanceId opIId) {
pendingOperators.add(opIId);
final OperatorRunnable hon = honMap.get(opIId);
@@ -97,9 +109,12 @@
e.printStackTrace();
return;
}
+ if (abort) {
+ return;
+ }
try {
LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
- + opIId.getOperatorId() + ":" + opIId.getPartition());
+ + opIId.getOperatorId() + ":" + opIId.getPartition());
} catch (Exception e) {
e.printStackTrace();
}
@@ -117,7 +132,7 @@
if (pendingOperators.isEmpty()) {
stats.setEndTime(new Date());
try {
- joblet.notifyStageletComplete(stageId, stats);
+ joblet.notifyStageletComplete(stageId, attempt, stats);
} catch (Exception e) {
e.printStackTrace();
}
@@ -125,7 +140,7 @@
}
private synchronized void waitUntilStarted() throws InterruptedException {
- while (!started) {
+ while (!started && !abort) {
wait();
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 5964d07..149f867 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -25,6 +25,7 @@
private IOperatorNodePushable opNode;
private IFrameReader reader;
private ByteBuffer buffer;
+ private volatile boolean abort;
public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
this.opNode = opNode;
@@ -39,6 +40,10 @@
this.reader = reader;
}
+ public void abort() {
+ abort = true;
+ }
+
@Override
public void run() {
try {
@@ -46,6 +51,9 @@
if (reader != null) {
reader.open();
while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
buffer.flip();
opNode.nextFrame(buffer);
buffer.compact();
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 8f62f26..ba7d0f7 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -76,34 +76,42 @@
watch(jobstage, a);
-define(stagestart, keys(0), {UUID, Integer});
+define(stagestart, keys(), {UUID, Integer, Integer});
define(stagefinish, keys(0, 1), {UUID, Integer, Set});
watch(jobstart, i);
-stagestart_INITIAL stagestart(JobId, 0) :-
+stagestart_INITIAL stagestart(JobId, 0, 0) :-
jobstart(JobId, _),
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
- notin stagestart(JobId, _);
+ notin stagestart(JobId, _, _);
update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
jobstart(JobId, _);
-stagestart_NEXT stagestart(JobId, NextStageNumber) :-
- stagestart(JobId, StageNumber),
+stagestart_NEXT stagestart(JobId, NextStageNumber, 0) :-
+ stagestart(JobId, StageNumber, _),
stagefinish#insert(StageId, StageNumber, _)
{
NextStageNumber := StageNumber + 1;
};
+stagestart_AGAIN stagestart(JobId, StageNumber, NextAttempt) :-
+ stagestart(JobId, StageNumber, Attempt),
+ abortcomplete(JobId, StageId, Attempt),
+ jobstage(JobId, StageNumber, StageId)
+ {
+ NextAttempt := Attempt + 1;
+ };
+
watch(stagestart, a);
watch(stagestart, d);
-define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, UUID, String});
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String});
-activitystart(JobId, OperatorId, ActivityId, StageNumber, StageId, NodeId) :-
- stagestart(JobId, StageNumber),
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId) :-
+ stagestart(JobId, StageNumber, Attempt),
operatordescriptor(JobId, OperatorId, _),
activitystage(JobId, OperatorId, ActivityId, StageNumber),
jobstage(JobId, StageNumber, StageId),
@@ -111,37 +119,74 @@
watch(activitystart, a);
-define(stageletstart, keys(0, 1, 3), {UUID, UUID, JobPlan, String, Set});
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
-stageletstart(JobId, StageId, JobPlan, NodeId, set<ActivityId>) :-
- activitystart#insert(JobId, _, ActivityId, StageNumber, StageId, NodeId),
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityId>) :-
+ activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId),
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
watch(stageletstart, a);
watch(stageletstart, i);
-define(startmessage_agg, keys(0, 1), {UUID, UUID, JobPlan, Set});
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
-startmessage_agg(JobId, StageId, JobPlan, set<Tuple>) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, ActivityIdSet)
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet)
{
Tuple := [NodeId, ActivityIdSet];
};
-startmessage(JobId, StageId, JobPlan, TSet) :-
- startmessage_agg(JobId, StageId, JobPlan, TSet);
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
watch(startmessage, a);
watch(startmessage, i);
-define(stageletcomplete_agg, keys(0, 1), {UUID, UUID, Set});
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
-stageletcomplete_agg(JobId, StageId, set<Statistics>) :-
- stageletcomplete(JobId, StageId, NodeId, Statistics);
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ failednodes(NodeId);
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ stageletabort(JobId, StageId, _, _, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ notin failednodes(NodeId)
+ {
+ Tuple := [NodeId, ActivityIdSet];
+ };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(abortnotify_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+abortnotify_agg(JobId, StageId, Attempt, set<NodeId>) :-
+ abortnotify(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+ abortnotify_agg(JobId, StageId, Attempt, NodeIdSet),
+ abortmessage_agg(JobId, StageId, Attempt, _, TSet),
+ TSet.size() == NodeIdSet.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+ stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
stagefinish(JobId, StageNumber, SSet) :-
- startmessage_agg(JobId, StageId, _, TSet),
- stageletcomplete_agg(JobId, StageId, SSet),
+ startmessage_agg(JobId, StageId, Attempt, _, TSet),
+ stageletcomplete_agg(JobId, StageId, Attempt, SSet),
jobstage(JobId, StageNumber, StageId),
TSet.size() == SSet.size();