Added code for fault tolerance

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@19 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 125c1aa..9e3cbbe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.util.UUID;
 
 public interface IConnectionEntry {
     ByteBuffer getReadBuffer();
@@ -32,4 +33,16 @@
     void close() throws IOException;
 
     void write(ByteBuffer buffer);
+
+    UUID getJobId();
+
+    UUID getStageId();
+
+    void setJobId(UUID jobId);
+
+    void setStageId(UUID stageId);
+
+    boolean aborted();
+
+    void abort();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
index 91f921c..7e74aee 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
@@ -18,4 +18,8 @@
 
 public interface IDataReceiveListenerFactory {
     public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex);
+
+    public UUID getJobId();
+
+    public UUID getStageId();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index 19e81f1..81b16d9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -30,8 +30,8 @@
 
     public void unregisterNode(INodeController nodeController) throws Exception;
 
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-            throws Exception;
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception;
 
     public void nodeHeartbeat(String id) throws Exception;
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index e7152f9..d349edf 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -32,7 +32,7 @@
 
     public NodeCapability getNodeCapability() throws Exception;
 
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
         Set<ActivityNodeId> activities) throws Exception;
 
     public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
@@ -40,6 +40,8 @@
 
     public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
 
+    public void abortJoblet(UUID jobId, UUID stageId) throws Exception;
+
     public void cleanUpJob(UUID jobId) throws Exception;
 
     public void startStage(UUID jobId, UUID stageId) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 4d9c309..89f7566 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -18,10 +18,10 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
 import edu.uci.ics.hyracks.context.HyracksContext;
@@ -41,6 +41,12 @@
 
     private final SelectionKey key;
 
+    private UUID jobId;
+
+    private UUID stageId;
+
+    private boolean aborted;
+
     public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
         this.socketChannel = socketChannel;
         readBuffer = ctx.getResourceManager().allocateFrame();
@@ -55,6 +61,9 @@
     }
 
     public boolean dispatch(SelectionKey key) throws IOException {
+        if (aborted) {
+            recvListener.dataReceived(this);
+        }
         if (key.isReadable()) {
             if (LOGGER.isLoggable(Level.FINER)) {
                 LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
@@ -135,12 +144,46 @@
     }
 
     @Override
-    public void close() throws IOException {
-        socketChannel.close();
+    public void close() {
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
 
     @Override
     public SelectionKey getSelectionKey() {
         return key;
     }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public void setJobId(UUID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
+
+    @Override
+    public void setStageId(UUID stageId) {
+        this.stageId = stageId;
+    }
+
+    @Override
+    public void abort() {
+        aborted = true;
+    }
+
+    @Override
+    public boolean aborted() {
+        return aborted;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index bfca6ad..464a95c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -26,9 +26,11 @@
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -62,6 +64,8 @@
 
     private final IDataReceiveListener initialDataReceiveListener;
 
+    private final Set<IConnectionEntry> connections;
+
     private volatile boolean stopped;
 
     private ByteBuffer emptyFrame;
@@ -76,7 +80,7 @@
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
-                    + serverSocket.getLocalPort());
+                + serverSocket.getLocalPort());
         }
 
         pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
@@ -85,6 +89,7 @@
         initialDataReceiveListener = new InitialDataReceiveListener();
         emptyFrame = ctx.getResourceManager().allocateFrame();
         emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+        connections = new HashSet<IConnectionEntry>();
     }
 
     public synchronized void dumpStats() {
@@ -116,7 +121,7 @@
     public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
         try {
             SocketChannel channel = SocketChannel
-                    .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+                .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
             byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
             ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
             buffer.clear();
@@ -173,6 +178,20 @@
         pendingConnectionReceivers.remove(id);
     }
 
+    public synchronized void abortConnections(UUID jobId, UUID stageId) {
+        List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+        synchronized (this) {
+            for (IConnectionEntry ce : connections) {
+                if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+                    abortConnections.add(ce);
+                }
+            }
+        }
+        synchronized (dataListenerThread) {
+            dataListenerThread.pendingAbortConnections.addAll(abortConnections);
+        }
+    }
+
     private final class NetworkFrameWriter implements IFrameWriter {
         private SocketChannel channel;
 
@@ -237,7 +256,8 @@
     private final class DataListenerThread extends Thread {
         private Selector selector;
 
-        private List<SocketChannel> pendingSockets;
+        private List<SocketChannel> pendingNewSockets;
+        private List<IConnectionEntry> pendingAbortConnections;
 
         public DataListenerThread() {
             super("Hyracks Data Listener Thread");
@@ -246,12 +266,13 @@
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            pendingSockets = new ArrayList<SocketChannel>();
+            pendingNewSockets = new ArrayList<SocketChannel>();
+            pendingAbortConnections = new ArrayList<IConnectionEntry>();
         }
 
         synchronized void addSocketChannel(SocketChannel sc) throws IOException {
             LOGGER.info("Connection received");
-            pendingSockets.add(sc);
+            pendingNewSockets.add(sc);
             selector.wakeup();
         }
 
@@ -264,8 +285,8 @@
                     }
                     int n = selector.select();
                     synchronized (this) {
-                        if (!pendingSockets.isEmpty()) {
-                            for (SocketChannel sc : pendingSockets) {
+                        if (!pendingNewSockets.isEmpty()) {
+                            for (SocketChannel sc : pendingNewSockets) {
                                 sc.configureBlocking(false);
                                 SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
                                 ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
@@ -275,7 +296,19 @@
                                     LOGGER.fine("Woke up selector");
                                 }
                             }
-                            pendingSockets.clear();
+                            pendingNewSockets.clear();
+                        }
+                        if (!pendingAbortConnections.isEmpty()) {
+                            for (IConnectionEntry ce : pendingAbortConnections) {
+                                SelectionKey key = ce.getSelectionKey();
+                                ce.abort();
+                                ((ConnectionEntry) ce).dispatch(key);
+                                key.cancel();
+                                ce.close();
+                                synchronized (ConnectionManager.this) {
+                                    connections.remove(ce);
+                                }
+                            }
                         }
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("Selector: " + n);
@@ -295,6 +328,9 @@
                                 if (close) {
                                     key.cancel();
                                     entry.close();
+                                    synchronized (ConnectionManager.this) {
+                                        connections.remove(entry);
+                                    }
                                 }
                             }
                         }
@@ -331,6 +367,11 @@
 
                 newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
                 entry.setDataReceiveListener(newListener);
+                entry.setJobId(connectionReceiver.getJobId());
+                entry.setStageId(connectionReceiver.getStageId());
+                synchronized (ConnectionManager.this) {
+                    connections.add(entry);
+                }
                 byte[] ack = new byte[4];
                 ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
                 ackBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 3eb2280..2cfcb94 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
-        IDataReceiveListener {
+    IDataReceiveListener {
     private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
 
     private final NonDeterministicFrameReader frameReader;
@@ -40,10 +40,14 @@
     private final BitSet readyBits;
     private IConnectionEntry senders[];
     private int openSenderCount;
+    private UUID jobId;
+    private UUID stageId;
 
-    public DemuxDataReceiveListenerFactory(HyracksContext ctx) {
+    public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
         frameReader = new NonDeterministicFrameReader(ctx, this);
         this.ctx = ctx;
+        this.jobId = jobId;
+        this.stageId = stageId;
         readyBits = new BitSet();
         senders = null;
         openSenderCount = 0;
@@ -66,7 +70,7 @@
         ByteBuffer buffer = entry.getReadBuffer();
         buffer.flip();
         int dataLen = buffer.remaining();
-        if (dataLen >= ctx.getFrameSize()) {
+        if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
             if (LOGGER.isLoggable(Level.FINEST)) {
                 LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
             }
@@ -139,4 +143,14 @@
     public synchronized int getSenderCount() {
         return senders.length;
     }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 3f2d449..91350735 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -55,6 +55,10 @@
         }
         while (true) {
             IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+            if (entry.aborted()) {
+                eos = true;
+                return false;
+            }
             lastReadSender = (Integer) entry.getAttachment();
             ByteBuffer netBuffer = entry.getReadBuffer();
             int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 4513ca9..4bbd96a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -178,9 +178,9 @@
     }
 
     @Override
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-        throws Exception {
-        jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception {
+        jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
     }
 
     @Override
@@ -255,7 +255,7 @@
         }
     }
 
-    private void killNode(String nodeId) {
+    private void killNode(String nodeId) throws Exception {
         nodeRegistry.remove(nodeId);
         jobManager.notifyNodeFailure(nodeId);
     }
@@ -272,7 +272,11 @@
                     }
                 }
                 for (String deadNode : deadNodes) {
-                    killNode(deadNode);
+                    try {
+                        killNode(deadNode);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
                 }
             }
         }
@@ -329,19 +333,22 @@
         private UUID jobId;
         private JobPlan plan;
         private UUID stageId;
+        private int attempt;
         private Set<ActivityNodeId> tasks;
 
-        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks) {
+        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+            Set<ActivityNodeId> tasks) {
             this.nodeId = nodeId;
             this.jobId = jobId;
             this.plan = plan;
             this.stageId = stageId;
+            this.attempt = attempt;
             this.tasks = tasks;
         }
 
         @Override
         public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
-            return node.initializeJobletPhase1(jobId, plan, stageId, tasks);
+            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks);
         }
 
         @Override
@@ -446,6 +453,34 @@
         }
     }
 
+    static class JobletAborter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.abortJoblet(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Aborting";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
     static class PortMapMergingAccumulator implements
         Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
         Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
index 7892b5c..a13dc7e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -30,12 +30,12 @@
 
     public void advanceJob(JobControl jobControlImpl) throws Exception;
 
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-            throws Exception;
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception;
 
     public JobStatus getJobStatus(UUID jobId);
 
     public JobStatistics waitForCompletion(UUID jobId) throws Exception;
 
-    public void notifyNodeFailure(String nodeId);
+    public void notifyNodeFailure(String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 3709753..8917959 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -56,8 +56,6 @@
 
     private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
 
-    private final ClusterControllerService ccs;
-
     private final Runtime jolRuntime;
 
     private final JobTable jobTable;
@@ -80,8 +78,13 @@
 
     private final StageletCompleteTable stageletCompleteTable;
 
-    public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
-        this.ccs = ccs;
+    private final FailedNodesTable failedNodesTable;
+
+    private final AbortMessageTable abortMessageTable;
+
+    private final AbortNotifyTable abortNotifyTable;
+
+    public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
         this.jolRuntime = jolRuntime;
         this.jobTable = new JobTable(jolRuntime);
         this.odTable = new OperatorDescriptorTable(jolRuntime);
@@ -93,6 +96,9 @@
         this.jobStartTable = new JobStartTable();
         this.startMessageTable = new StartMessageTable(jolRuntime);
         this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+        this.failedNodesTable = new FailedNodesTable(jolRuntime);
+        this.abortMessageTable = new AbortMessageTable(jolRuntime);
+        this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
 
         jolRuntime.catalog().register(jobTable);
         jolRuntime.catalog().register(odTable);
@@ -104,6 +110,9 @@
         jolRuntime.catalog().register(jobStartTable);
         jolRuntime.catalog().register(startMessageTable);
         jolRuntime.catalog().register(stageletCompleteTable);
+        jolRuntime.catalog().register(failedNodesTable);
+        jolRuntime.catalog().register(abortMessageTable);
+        jolRuntime.catalog().register(abortNotifyTable);
 
         jobTable.register(new JobTable.Callback() {
             @Override
@@ -123,6 +132,7 @@
 
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public void insertion(TupleSet tuples) {
                 try {
@@ -131,15 +141,16 @@
                             Object[] data = t.toArray();
                             UUID jobId = (UUID) data[0];
                             UUID stageId = (UUID) data[1];
-                            JobPlan plan = (JobPlan) data[2];
-                            Set<List> ts = (Set<List>) data[3];
+                            Integer attempt = (Integer) data[2];
+                            JobPlan plan = (JobPlan) data[3];
+                            Set<List> ts = (Set<List>) data[4];
                             ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
                                 .size()];
                             int i = 0;
                             for (List t2 : ts) {
                                 Object[] t2Data = t2.toArray();
                                 p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
-                                    plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
+                                    plan, stageId, attempt, (Set<ActivityNodeId>) t2Data[1]);
                             }
                             Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
                                 new ClusterControllerService.PortMapMergingAccumulator());
@@ -170,6 +181,44 @@
             }
         });
 
+        abortMessageTable.register(new AbortMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                try {
+                    synchronized (JOLJobManagerImpl.this) {
+                        for (Tuple t : tuples) {
+                            Object[] data = t.toArray();
+                            UUID jobId = (UUID) data[0];
+                            UUID stageId = (UUID) data[1];
+                            Integer attempt = (Integer) data[2];
+                            Set<List> ts = (Set<List>) data[4];
+                            ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+                                .size()];
+                            int i = 0;
+                            BasicTupleSet notificationTuples = new BasicTupleSet();
+                            for (List t2 : ts) {
+                                Object[] t2Data = t2.toArray();
+                                String nodeId = (String) t2Data[0];
+                                jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
+                                notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
+                            }
+                            ccs.runRemote(jas, null);
+
+                            jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
         jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
         jolRuntime.evaluate();
     }
@@ -270,15 +319,19 @@
     }
 
     @Override
-    public void notifyNodeFailure(String nodeId) {
+    public void notifyNodeFailure(String nodeId) throws Exception {
+        BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
 
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+        jolRuntime.evaluate();
     }
 
     @Override
-    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
         StageletStatistics statistics) throws Exception {
         BasicTupleSet scTuples = new BasicTupleSet();
-        scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, statistics));
+        scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
 
         jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
 
@@ -314,6 +367,7 @@
 
         private static Key PRIMARY_KEY = new Key(0);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
         };
@@ -322,10 +376,12 @@
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
+        @SuppressWarnings("unchecked")
         static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
             return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
         }
 
+        @SuppressWarnings("unchecked")
         JobStatistics buildJobStatistics(Tuple jobTuple) {
             Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
             JobStatistics stats = new JobStatistics();
@@ -358,6 +414,7 @@
 
         private static Key PRIMARY_KEY = new Key(0, 1);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
         };
@@ -379,6 +436,7 @@
 
         private static Key PRIMARY_KEY = new Key();
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, OperatorDescriptorId.class, String.class
         };
@@ -400,6 +458,7 @@
 
         private static Key PRIMARY_KEY = new Key(0, 1);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class,
             ConnectorDescriptorId.class,
@@ -433,6 +492,7 @@
 
         private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
         };
@@ -455,6 +515,7 @@
 
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
         };
@@ -477,6 +538,7 @@
 
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class,
             OperatorDescriptorId.class,
@@ -504,6 +566,7 @@
     private static class JobStartTable extends EventTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
             UUID.class, Long.class
         };
@@ -523,10 +586,11 @@
     private static class StartMessageTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
 
-        private static Key PRIMARY_KEY = new Key(0, 1);
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, JobPlan.class, Set.class
+            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
         };
 
         public StartMessageTable(Runtime context) {
@@ -535,23 +599,87 @@
     }
 
     /*
-     * declare(stageletcomplete, keys(0, 1, 2), {JobId, StageId, NodeId, StageletStatistics})
+     * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
      */
     private static class StageletCompleteTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
 
-        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
+        @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, String.class, StageletStatistics.class
+            UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
         };
 
         public StageletCompleteTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
-        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics) {
-            return new Tuple(jobId, stageId, nodeId, statistics);
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+            StageletStatistics statistics) {
+            return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+        }
+    }
+
+    /*
+     * declare(failednodes, keys(0), {NodeId})
+     */
+    private static class FailedNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class
+        };
+
+        public FailedNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
+     * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+     */
+    private static class AbortMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+        };
+
+        public AbortMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     */
+    private static class AbortNotifyTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, Integer.class
+        };
+
+        public AbortNotifyTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index 11a7bdd..fc902b2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -151,7 +151,7 @@
             .size()];
         int i = 0;
         for (String nodeId : participatingNodes) {
-            p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, stage.getTasks());
+            p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, 0, stage.getTasks());
         }
         Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
             new ClusterControllerService.PortMapMergingAccumulator());
@@ -179,7 +179,8 @@
         return participatingNodes;
     }
 
-    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
+    @Override
+    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
         StageletStatistics statistics) throws Exception {
         JobControl jc = jobMap.get(jobId);
         if (jc != null) {
@@ -187,11 +188,13 @@
         }
     }
 
+    @Override
     public synchronized JobStatus getJobStatus(UUID jobId) {
         JobControl jc = jobMap.get(jobId);
         return jc.getJobStatus();
     }
 
+    @Override
     public JobStatistics waitForCompletion(UUID jobId) throws Exception {
         JobControl jc;
         synchronized (this) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 055216d..2269e1f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -87,8 +87,8 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, StageletStatistics stats) throws Exception {
+    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
         stageletMap.remove(stageId);
-        nodeController.notifyStageComplete(jobId, stageId, stats);
+        nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index cde2ec4..7b595b7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -161,13 +161,13 @@
     }
 
     @Override
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
         Set<ActivityNodeId> activities) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
 
         final Joblet joblet = getLocalJoblet(jobId);
 
-        Stagelet stagelet = new Stagelet(joblet, stageId, id);
+        Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
         joblet.setStagelet(stageId, stagelet);
 
         final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@@ -197,7 +197,8 @@
                             IConnectorDescriptor conn = inputs.get(j);
                             Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
                             endpointList.add(endpoint);
-                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
+                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+                                stageId);
                             connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
                             PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
                                 .getTaskInputMap().get(hanId).get(j), i);
@@ -368,8 +369,8 @@
         }
     }
 
-    public void notifyStageComplete(UUID jobId, UUID stageId, StageletStatistics stats) throws Exception {
-        ccs.notifyStageletComplete(jobId, stageId, id, stats);
+    public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+        ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
     }
 
     @Override
@@ -398,4 +399,16 @@
             }
         }
     }
+
+    @Override
+    public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Stagelet stagelet = ji.getStagelet(stageId);
+            if (stagelet != null) {
+                stagelet.abort();
+                connectionManager.abortConnections(jobId, stageId);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 8acab28..8c0b830 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -40,19 +40,24 @@
 
     private final UUID stageId;
 
+    private final int attempt;
+
     private final Map<OperatorInstanceId, OperatorRunnable> honMap;
 
     private List<Endpoint> endpointList;
 
     private boolean started;
 
+    private volatile boolean abort;
+
     private final Set<OperatorInstanceId> pendingOperators;
 
     private final StageletStatistics stats;
 
-    public Stagelet(Joblet joblet, UUID stageId, String nodeId) throws RemoteException {
+    public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
         this.joblet = joblet;
         this.stageId = stageId;
+        this.attempt = attempt;
         pendingOperators = new HashSet<OperatorInstanceId>();
         started = false;
         honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
@@ -85,6 +90,13 @@
         notifyAll();
     }
 
+    public synchronized void abort() {
+        this.abort = true;
+        for (OperatorRunnable r : honMap.values()) {
+            r.abort();
+        }
+    }
+
     public void installRunnable(final OperatorInstanceId opIId) {
         pendingOperators.add(opIId);
         final OperatorRunnable hon = honMap.get(opIId);
@@ -97,9 +109,12 @@
                     e.printStackTrace();
                     return;
                 }
+                if (abort) {
+                    return;
+                }
                 try {
                     LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
-                            + opIId.getOperatorId() + ":" + opIId.getPartition());
+                        + opIId.getOperatorId() + ":" + opIId.getPartition());
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -117,7 +132,7 @@
         if (pendingOperators.isEmpty()) {
             stats.setEndTime(new Date());
             try {
-                joblet.notifyStageletComplete(stageId, stats);
+                joblet.notifyStageletComplete(stageId, attempt, stats);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -125,7 +140,7 @@
     }
 
     private synchronized void waitUntilStarted() throws InterruptedException {
-        while (!started) {
+        while (!started && !abort) {
             wait();
         }
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 5964d07..149f867 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -25,6 +25,7 @@
     private IOperatorNodePushable opNode;
     private IFrameReader reader;
     private ByteBuffer buffer;
+    private volatile boolean abort;
 
     public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
         this.opNode = opNode;
@@ -39,6 +40,10 @@
         this.reader = reader;
     }
 
+    public void abort() {
+        abort = true;
+    }
+
     @Override
     public void run() {
         try {
@@ -46,6 +51,9 @@
             if (reader != null) {
                 reader.open();
                 while (reader.nextFrame(buffer)) {
+                    if (abort) {
+                        break;
+                    }
                     buffer.flip();
                     opNode.nextFrame(buffer);
                     buffer.compact();
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 8f62f26..ba7d0f7 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -76,34 +76,42 @@
 
 watch(jobstage, a);
 
-define(stagestart, keys(0), {UUID, Integer});
+define(stagestart, keys(), {UUID, Integer, Integer});
 define(stagefinish, keys(0, 1), {UUID, Integer, Set});
 
 watch(jobstart, i);
 
-stagestart_INITIAL stagestart(JobId, 0) :-
+stagestart_INITIAL stagestart(JobId, 0, 0) :-
     jobstart(JobId, _),
     job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
-    notin stagestart(JobId, _);
+    notin stagestart(JobId, _, _);
 
 update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
     job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
     jobstart(JobId, _);
 
-stagestart_NEXT stagestart(JobId, NextStageNumber) :-
-    stagestart(JobId, StageNumber),
+stagestart_NEXT stagestart(JobId, NextStageNumber, 0) :-
+    stagestart(JobId, StageNumber, _),
     stagefinish#insert(StageId, StageNumber, _)
     {
         NextStageNumber := StageNumber + 1;
     };
 
+stagestart_AGAIN stagestart(JobId, StageNumber, NextAttempt) :-
+    stagestart(JobId, StageNumber, Attempt),
+    abortcomplete(JobId, StageId, Attempt),
+    jobstage(JobId, StageNumber, StageId)
+    {
+        NextAttempt := Attempt + 1;
+    };
+
 watch(stagestart, a);
 watch(stagestart, d);
 
-define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, UUID, String});
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String});
 
-activitystart(JobId, OperatorId, ActivityId, StageNumber, StageId, NodeId) :-
-    stagestart(JobId, StageNumber),
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId) :-
+    stagestart(JobId, StageNumber, Attempt),
     operatordescriptor(JobId, OperatorId, _),
     activitystage(JobId, OperatorId, ActivityId, StageNumber),
     jobstage(JobId, StageNumber, StageId),
@@ -111,37 +119,74 @@
 
 watch(activitystart, a);
 
-define(stageletstart, keys(0, 1, 3), {UUID, UUID, JobPlan, String, Set});
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
 
-stageletstart(JobId, StageId, JobPlan, NodeId, set<ActivityId>) :-
-    activitystart#insert(JobId, _, ActivityId, StageNumber, StageId, NodeId),
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityId>) :-
+    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId),
     job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
 
 watch(stageletstart, a);
 watch(stageletstart, i);
 
-define(startmessage_agg, keys(0, 1), {UUID, UUID, JobPlan, Set});
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
 
-startmessage_agg(JobId, StageId, JobPlan, set<Tuple>) :-
-    stageletstart(JobId, StageId, JobPlan, NodeId, ActivityIdSet)
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet)
     {
         Tuple := [NodeId, ActivityIdSet];
     };
 
-startmessage(JobId, StageId, JobPlan, TSet) :-
-    startmessage_agg(JobId, StageId, JobPlan, TSet);
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
 
 watch(startmessage, a);
 watch(startmessage, i);
 
-define(stageletcomplete_agg, keys(0, 1), {UUID, UUID, Set});
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
 
-stageletcomplete_agg(JobId, StageId, set<Statistics>) :-
-    stageletcomplete(JobId, StageId, NodeId, Statistics);
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    failednodes(NodeId);
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    stageletabort(JobId, StageId, _, _, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    notin failednodes(NodeId)
+    {
+        Tuple := [NodeId, ActivityIdSet];
+    };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(abortnotify_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+abortnotify_agg(JobId, StageId, Attempt, set<NodeId>) :-
+    abortnotify(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+    abortnotify_agg(JobId, StageId, Attempt, NodeIdSet),
+    abortmessage_agg(JobId, StageId, Attempt, _, TSet),
+    TSet.size() == NodeIdSet.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+    stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
 
 stagefinish(JobId, StageNumber, SSet) :-
-    startmessage_agg(JobId, StageId, _, TSet),
-    stageletcomplete_agg(JobId, StageId, SSet),
+    startmessage_agg(JobId, StageId, Attempt, _, TSet),
+    stageletcomplete_agg(JobId, StageId, Attempt, SSet),
     jobstage(JobId, StageNumber, StageId),
     TSet.size() == SSet.size();