Execption raised by a operator is not handled correctly and hence the job continues to be in a running state. 

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@153 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index d6c3ee2..587fd01 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -32,139 +32,145 @@
 import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
 
 public class Stagelet {
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    private static final Logger LOGGER = Logger.getLogger(Stagelet.class.getName());
+	private static final Logger LOGGER = Logger.getLogger(Stagelet.class
+			.getName());
 
-    private final Joblet joblet;
+	private final Joblet joblet;
 
-    private final UUID stageId;
+	private final UUID stageId;
 
-    private final int attempt;
+	private final int attempt;
 
-    private final Map<OperatorInstanceId, OperatorRunnable> honMap;
+	private final Map<OperatorInstanceId, OperatorRunnable> honMap;
 
-    private List<Endpoint> endpointList;
+	private List<Endpoint> endpointList;
 
-    private boolean started;
+	private boolean started;
 
-    private volatile boolean abort;
+	private volatile boolean abort;
 
-    private final Set<OperatorInstanceId> pendingOperators;
+	private final Set<OperatorInstanceId> pendingOperators;
 
-    private final StageletStatistics stats;
+	private final StageletStatistics stats;
 
-    public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
-        this.joblet = joblet;
-        this.stageId = stageId;
-        this.attempt = attempt;
-        pendingOperators = new HashSet<OperatorInstanceId>();
-        started = false;
-        honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
-        stats = new StageletStatistics();
-        stats.setNodeId(nodeId);
-    }
+	public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId)
+			throws RemoteException {
+		this.joblet = joblet;
+		this.stageId = stageId;
+		this.attempt = attempt;
+		pendingOperators = new HashSet<OperatorInstanceId>();
+		started = false;
+		honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+		stats = new StageletStatistics();
+		stats.setNodeId(nodeId);
+	}
 
-    public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
-        honMap.put(new OperatorInstanceId(odId, partition), hon);
-    }
+	public void setOperator(OperatorDescriptorId odId, int partition,
+			OperatorRunnable hon) {
+		honMap.put(new OperatorInstanceId(odId, partition), hon);
+	}
 
-    public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
-        return honMap;
-    }
+	public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
+		return honMap;
+	}
 
-    public void setEndpointList(List<Endpoint> endpointList) {
-        this.endpointList = endpointList;
-    }
+	public void setEndpointList(List<Endpoint> endpointList) {
+		this.endpointList = endpointList;
+	}
 
-    public List<Endpoint> getEndpointList() {
-        return endpointList;
-    }
+	public List<Endpoint> getEndpointList() {
+		return endpointList;
+	}
 
-    public synchronized void start() throws Exception {
-        if (started) {
-            throw new Exception("Joblet already started");
-        }
-        started = true;
-        stats.setStartTime(new Date());
-        notifyAll();
-    }
+	public synchronized void start() throws Exception {
+		if (started) {
+			throw new Exception("Joblet already started");
+		}
+		started = true;
+		stats.setStartTime(new Date());
+		notifyAll();
+	}
 
-    public synchronized void abort() {
-        this.abort = true;
-        for (OperatorRunnable r : honMap.values()) {
-            r.abort();
-        }
-    }
+	public synchronized void abort() {
+		this.abort = true;
+		for (OperatorRunnable r : honMap.values()) {
+			r.abort();
+		}
+	}
 
-    public void installRunnable(final OperatorInstanceId opIId) {
-        pendingOperators.add(opIId);
-        final OperatorRunnable hon = honMap.get(opIId);
-        joblet.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    waitUntilStarted();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                    return;
-                }
-                if (abort) {
-                    return;
-                }
-                try {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + ": STARTING");
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    // notifyOperatorFailure(opIId);
-                }
-                try {
-                    hon.run();
-                    notifyOperatorCompletion(opIId);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    // notifyOperatorFailure(opIId);
-                }
-                try {
-                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
-                            + opIId.getPartition() + ": TERMINATED");
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    // notifyOperatorFailure(opIId);
-                }
-            }
-        });
-    }
+	public void installRunnable(final OperatorInstanceId opIId) {
+		pendingOperators.add(opIId);
+		final OperatorRunnable hon = honMap.get(opIId);
+		joblet.getExecutor().execute(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					waitUntilStarted();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+					return;
+				}
+				if (abort) {
+					return;
+				}
+				try {
+					LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId
+							+ ":" + opIId.getOperatorId() + ":"
+							+ opIId.getPartition() + ": STARTING");
+				} catch (Exception e) {
+					e.printStackTrace();
+					// notifyOperatorFailure(opIId);
+				}
+				try {
+					hon.run();
+					notifyOperatorCompletion(opIId);
+				} catch (Exception e) {
+					e.printStackTrace();
+					notifyOperatorFailure(opIId);
+				}
+				try {
+					LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId
+							+ ":" + opIId.getOperatorId() + ":"
+							+ opIId.getPartition() + ": TERMINATED");
+				} catch (Exception e) {
+					e.printStackTrace();
+					// notifyOperatorFailure(opIId);
+				}
+			}
+		});
+	}
 
-    protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
-        pendingOperators.remove(opIId);
-        if (pendingOperators.isEmpty()) {
-            stats.setEndTime(new Date());
-            try {
-                joblet.notifyStageletComplete(stageId, attempt, stats);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
+	protected synchronized void notifyOperatorCompletion(
+			OperatorInstanceId opIId) {
+		pendingOperators.remove(opIId);
+		if (pendingOperators.isEmpty()) {
+			stats.setEndTime(new Date());
+			try {
+				joblet.notifyStageletComplete(stageId, attempt, stats);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
 
-    protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
-        abort();
-        try {
-            joblet.notifyStageletFailed(stageId, attempt);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+	protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+		abort();
+		try {
+			joblet.notifyStageletFailed(stageId, attempt);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
 
-    private synchronized void waitUntilStarted() throws InterruptedException {
-        while (!started && !abort) {
-            wait();
-        }
-    }
+	private synchronized void waitUntilStarted() throws InterruptedException {
+		while (!started && !abort) {
+			wait();
+		}
+	}
 
-    public StageletStatistics getStatistics() {
-        return stats;
-    }
+	public StageletStatistics getStatistics() {
+		return stats;
+	}
 }
\ No newline at end of file