Execption raised by a operator is not handled correctly and hence the job continues to be in a running state.
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@153 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index d6c3ee2..587fd01 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -32,139 +32,145 @@
import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
public class Stagelet {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(Stagelet.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(Stagelet.class
+ .getName());
- private final Joblet joblet;
+ private final Joblet joblet;
- private final UUID stageId;
+ private final UUID stageId;
- private final int attempt;
+ private final int attempt;
- private final Map<OperatorInstanceId, OperatorRunnable> honMap;
+ private final Map<OperatorInstanceId, OperatorRunnable> honMap;
- private List<Endpoint> endpointList;
+ private List<Endpoint> endpointList;
- private boolean started;
+ private boolean started;
- private volatile boolean abort;
+ private volatile boolean abort;
- private final Set<OperatorInstanceId> pendingOperators;
+ private final Set<OperatorInstanceId> pendingOperators;
- private final StageletStatistics stats;
+ private final StageletStatistics stats;
- public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
- this.joblet = joblet;
- this.stageId = stageId;
- this.attempt = attempt;
- pendingOperators = new HashSet<OperatorInstanceId>();
- started = false;
- honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
- stats = new StageletStatistics();
- stats.setNodeId(nodeId);
- }
+ public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId)
+ throws RemoteException {
+ this.joblet = joblet;
+ this.stageId = stageId;
+ this.attempt = attempt;
+ pendingOperators = new HashSet<OperatorInstanceId>();
+ started = false;
+ honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+ stats = new StageletStatistics();
+ stats.setNodeId(nodeId);
+ }
- public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
- honMap.put(new OperatorInstanceId(odId, partition), hon);
- }
+ public void setOperator(OperatorDescriptorId odId, int partition,
+ OperatorRunnable hon) {
+ honMap.put(new OperatorInstanceId(odId, partition), hon);
+ }
- public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
- return honMap;
- }
+ public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
+ return honMap;
+ }
- public void setEndpointList(List<Endpoint> endpointList) {
- this.endpointList = endpointList;
- }
+ public void setEndpointList(List<Endpoint> endpointList) {
+ this.endpointList = endpointList;
+ }
- public List<Endpoint> getEndpointList() {
- return endpointList;
- }
+ public List<Endpoint> getEndpointList() {
+ return endpointList;
+ }
- public synchronized void start() throws Exception {
- if (started) {
- throw new Exception("Joblet already started");
- }
- started = true;
- stats.setStartTime(new Date());
- notifyAll();
- }
+ public synchronized void start() throws Exception {
+ if (started) {
+ throw new Exception("Joblet already started");
+ }
+ started = true;
+ stats.setStartTime(new Date());
+ notifyAll();
+ }
- public synchronized void abort() {
- this.abort = true;
- for (OperatorRunnable r : honMap.values()) {
- r.abort();
- }
- }
+ public synchronized void abort() {
+ this.abort = true;
+ for (OperatorRunnable r : honMap.values()) {
+ r.abort();
+ }
+ }
- public void installRunnable(final OperatorInstanceId opIId) {
- pendingOperators.add(opIId);
- final OperatorRunnable hon = honMap.get(opIId);
- joblet.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- waitUntilStarted();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
- if (abort) {
- return;
- }
- try {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + ": STARTING");
- } catch (Exception e) {
- e.printStackTrace();
- // notifyOperatorFailure(opIId);
- }
- try {
- hon.run();
- notifyOperatorCompletion(opIId);
- } catch (Exception e) {
- e.printStackTrace();
- // notifyOperatorFailure(opIId);
- }
- try {
- LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
- + opIId.getPartition() + ": TERMINATED");
- } catch (Exception e) {
- e.printStackTrace();
- // notifyOperatorFailure(opIId);
- }
- }
- });
- }
+ public void installRunnable(final OperatorInstanceId opIId) {
+ pendingOperators.add(opIId);
+ final OperatorRunnable hon = honMap.get(opIId);
+ joblet.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ waitUntilStarted();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+ if (abort) {
+ return;
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId
+ + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": STARTING");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
+ try {
+ hon.run();
+ notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ notifyOperatorFailure(opIId);
+ }
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId
+ + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": TERMINATED");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
+ }
+ });
+ }
- protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
- pendingOperators.remove(opIId);
- if (pendingOperators.isEmpty()) {
- stats.setEndTime(new Date());
- try {
- joblet.notifyStageletComplete(stageId, attempt, stats);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
+ protected synchronized void notifyOperatorCompletion(
+ OperatorInstanceId opIId) {
+ pendingOperators.remove(opIId);
+ if (pendingOperators.isEmpty()) {
+ stats.setEndTime(new Date());
+ try {
+ joblet.notifyStageletComplete(stageId, attempt, stats);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
- protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
- abort();
- try {
- joblet.notifyStageletFailed(stageId, attempt);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+ abort();
+ try {
+ joblet.notifyStageletFailed(stageId, attempt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
- private synchronized void waitUntilStarted() throws InterruptedException {
- while (!started && !abort) {
- wait();
- }
- }
+ private synchronized void waitUntilStarted() throws InterruptedException {
+ while (!started && !abort) {
+ wait();
+ }
+ }
- public StageletStatistics getStatistics() {
- return stats;
- }
+ public StageletStatistics getStatistics() {
+ return stats;
+ }
}
\ No newline at end of file