Fixed application deployment. Fault recovery works correctly.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@303 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
index 7cdaee1..4d9b5a0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
@@ -32,6 +32,6 @@
     @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
     public int profileDumpPeriod = 0;
 
-    @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
-    public boolean useJOL = false;
+    @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
+    public int defaultMaxJobAttempts = 5;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index bf1f0bd..59072f8 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -53,7 +53,7 @@
 
     private final Set<ConstraintExpression> userConstraints;
 
-    private int maxRetries;
+    private int maxAttempts;
 
     public JobSpecification() {
         roots = new ArrayList<OperatorDescriptorId>();
@@ -173,12 +173,12 @@
         return roots;
     }
 
-    public void setMaxRetries(int maxRetries) {
-        this.maxRetries = maxRetries;
+    public void setMaxAttempts(int maxAttempts) {
+        this.maxAttempts = maxAttempts;
     }
 
-    public int getMaxRetries() {
-        return maxRetries;
+    public int getMaxAttempts() {
+        return maxAttempts;
     }
 
     public void addUserConstraint(ConstraintExpression constraint) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8772940..a4aa482 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -58,6 +58,7 @@
 import edu.uci.ics.hyracks.control.cc.job.manager.events.StageletCompleteEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.StageletFailureEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.UnregisterNodeEvent;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.jobqueue.JobQueue;
 import edu.uci.ics.hyracks.control.cc.scheduler.IScheduler;
 import edu.uci.ics.hyracks.control.cc.scheduler.naive.NaiveScheduler;
@@ -248,14 +249,16 @@
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        ApplicationDestroyEvent de = new ApplicationDestroyEvent(this, appName);
-        jobQueue.scheduleAndSync(de);
+        FutureValue fv = new FutureValue();
+        jobQueue.schedule(new ApplicationDestroyEvent(this, appName, fv));
+        fv.get();
     }
 
     @Override
     public void startApplication(final String appName) throws Exception {
-        ApplicationStartEvent r = new ApplicationStartEvent(this, appName);
-        jobQueue.scheduleAndSync(r);
+        FutureValue fv = new FutureValue();
+        jobQueue.schedule(new ApplicationStartEvent(this, appName, fv));
+        fv.get();
     }
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
index ce22334..69c244a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
@@ -19,33 +19,56 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 
-public class ApplicationDestroyEvent extends SynchronizableRunnable {
+public class ApplicationDestroyEvent implements Runnable {
     private final ClusterControllerService ccs;
     private final String appName;
+    private FutureValue fv;
 
-    public ApplicationDestroyEvent(ClusterControllerService ccs, String appName) {
+    public ApplicationDestroyEvent(ClusterControllerService ccs, String appName, FutureValue fv) {
         this.ccs = ccs;
         this.appName = appName;
+        this.fv = fv;
     }
 
     @Override
-    protected void doRun() throws Exception {
-        ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
+    public void run() {
+        final ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
         if (appCtx == null) {
-            throw new HyracksException("No application with name: " + appName);
+            fv.setException(new HyracksException("No application with name: " + appName));
+            return;
         }
         List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
         for (final String nodeId : ccs.getNodeMap().keySet()) {
             opList.add(new ApplicationDestroyer(nodeId, appName));
         }
-        RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
-        RemoteRunner.runRemote(ccs, ops, null);
-        appCtx.deinitialize();
+        final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
+        ccs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    RemoteRunner.runRemote(ccs, ops, null);
+                } catch (Exception e) {
+                    fv.setException(e);
+                    return;
+                }
+                ccs.getJobQueue().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            appCtx.deinitialize();
+                        } catch (Exception e) {
+                            fv.setException(e);
+                        }
+                        fv.setValue(null);
+                    }
+                });
+            }
+        });
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
index db9e150..9f4ad8f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
@@ -20,36 +20,53 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 
-public class ApplicationStartEvent extends SynchronizableRunnable {
+public class ApplicationStartEvent implements Runnable {
     private final ClusterControllerService ccs;
     private final String appName;
+    private final FutureValue fv;
 
-    public ApplicationStartEvent(ClusterControllerService ccs, String appName) {
+    public ApplicationStartEvent(ClusterControllerService ccs, String appName, FutureValue fv) {
         this.ccs = ccs;
         this.appName = appName;
+        this.fv = fv;
     }
 
     @Override
-    protected void doRun() throws Exception {
+    public void run() {
         ApplicationContext appCtx = ccs.getApplicationMap().get(appName);
         if (appCtx == null) {
-            throw new HyracksException("No application with name: " + appName);
+            fv.setException(new HyracksException("No application with name: " + appName));
+            return;
         }
-        appCtx.initializeClassPath();
-        appCtx.initialize();
-        final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
-        final boolean deployHar = appCtx.containsHar();
-        List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
-        for (final String nodeId : ccs.getNodeMap().keySet()) {
-            opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
+        try {
+            appCtx.initializeClassPath();
+            appCtx.initialize();
+            final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
+            final boolean deployHar = appCtx.containsHar();
+            List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
+            for (final String nodeId : ccs.getNodeMap().keySet()) {
+                opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
+            }
+            final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RemoteRunner.runRemote(ccs, ops, null);
+                        fv.setValue(null);
+                    } catch (Exception e) {
+                        fv.setException(e);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            fv.setException(e);
         }
-        RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
-        RemoteRunner.runRemote(ccs, ops, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
index 9cc01bb..7dc6496 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
@@ -33,8 +33,11 @@
     @Override
     public void run() {
         JobRun run = ccs.getRunMap().get(jobId);
-        int maxRetries = run.getJobPlan().getJobSpecification().getMaxRetries();
-        if (run.getAttempts().size() > maxRetries) {
+        int maxAttempts = run.getJobPlan().getJobSpecification().getMaxAttempts();
+        if (maxAttempts == 0) {
+            maxAttempts = ccs.getConfig().defaultMaxJobAttempts;
+        }
+        if (run.getAttempts().size() > maxAttempts) {
             run.setStatus(JobStatus.FAILURE);
             return;
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index 718b913..c4ed077 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -33,43 +32,55 @@
     private ClusterControllerService ccs;
     private UUID jobId;
     private int attempt;
+    private JobStatus status;
 
-    public JobCleanupEvent(ClusterControllerService ccs, UUID jobId, int attempt) {
+    public JobCleanupEvent(ClusterControllerService ccs, UUID jobId, int attempt, JobStatus status) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.attempt = attempt;
+        this.status = status;
     }
 
     @Override
     public void run() {
-        JobRun run = ccs.getRunMap().get(jobId);
-        JobAttempt ja = run.getAttempts().get(attempt);
+        final JobRun run = ccs.getRunMap().get(jobId);
+        final JobAttempt ja = run.getAttempts().get(attempt);
         Set<String> targetNodes = ja.getParticipatingNodeIds();
-        if (!targetNodes.isEmpty()) {
-            JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
-            int i = 0;
-            for (String n : targetNodes) {
-                jcns[i++] = new JobCompleteNotifier(n, jobId);
-            }
-            try {
-                RemoteRunner.runRemote(ccs, jcns, null);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            CCApplicationContext appCtx = ccs.getApplicationMap().get(ja.getPlan().getApplicationName());
-            if (appCtx != null) {
-                try {
-                    appCtx.notifyJobFinish(jobId);
-                } catch (HyracksException e) {
-                    e.printStackTrace();
-                }
-            }
-            Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-            for (String nodeId : ja.getParticipatingNodeIds()) {
-                NodeControllerState state = nodeMap.get(nodeId);
-                state.getActiveJobIds().remove(jobId);
-            }
+        final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
+        int i = 0;
+        for (String n : targetNodes) {
+            jcns[i++] = new JobCompleteNotifier(n, jobId);
         }
-        run.setStatus(JobStatus.TERMINATED);
+        ccs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                if (jcns.length > 0) {
+                    try {
+                        RemoteRunner.runRemote(ccs, jcns, null);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                ccs.getJobQueue().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        CCApplicationContext appCtx = ccs.getApplicationMap().get(ja.getPlan().getApplicationName());
+                        if (appCtx != null) {
+                            try {
+                                appCtx.notifyJobFinish(jobId);
+                            } catch (HyracksException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+                        for (String nodeId : ja.getParticipatingNodeIds()) {
+                            NodeControllerState state = nodeMap.get(nodeId);
+                            state.getActiveJobIds().remove(jobId);
+                        }
+                        run.setStatus(status);
+                    }
+                });
+            }
+        });
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 43e8e0e..11850ee 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -18,11 +18,16 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
 
 public class RemoveDeadNodesEvent implements Runnable {
+    private static Logger LOGGER = Logger.getLogger(RemoveDeadNodesEvent.class.getName());
+
     private final ClusterControllerService ccs;
 
     public RemoveDeadNodesEvent(ClusterControllerService ccs) {
@@ -37,11 +42,13 @@
             NodeControllerState state = e.getValue();
             if (state.incrementLastHeartbeatDuration() >= ccs.getConfig().maxHeartbeatLapsePeriods) {
                 deadNodes.add(e.getKey());
+                LOGGER.info(e.getKey() + " considered dead");
             }
         }
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
             for (final UUID jid : state.getActiveJobIds()) {
+                LOGGER.info("Aborting: " + jid);
                 ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid));
             }
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
index 1a4cbc5..c4839da 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobAttempt;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -64,7 +65,7 @@
                 + scheduledStages);
         if (pendingStages.size() == 1 && scheduledStages.isEmpty()) {
             LOGGER.info(jobId + ":" + attempt + ":No more runnable stages");
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobId, attempt));
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobId, attempt, JobStatus.TERMINATED));
             return;
         }
 
@@ -90,13 +91,14 @@
         } catch (HyracksException e) {
             e.printStackTrace();
             ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jobId));
+            return;
         }
 
-        JobPlan plan = run.getJobPlan();
-        for (JobStageAttempt jsa : runnableStageAttempts) {
+        final JobPlan plan = run.getJobPlan();
+        for (final JobStageAttempt jsa : runnableStageAttempts) {
             ISchedule schedule = jsa.getSchedule();
-            Map<OperatorDescriptorId, Integer> partCountMap = new HashMap<OperatorDescriptorId, Integer>();
-            Map<String, Map<ActivityNodeId, Set<Integer>>> targetMap = new HashMap<String, Map<ActivityNodeId, Set<Integer>>>();
+            final Map<OperatorDescriptorId, Integer> partCountMap = new HashMap<OperatorDescriptorId, Integer>();
+            final Map<String, Map<ActivityNodeId, Set<Integer>>> targetMap = new HashMap<String, Map<ActivityNodeId, Set<Integer>>>();
             for (ActivityNodeId aid : jsa.getJobStage().getTasks()) {
                 String[] locations = schedule.getPartitions(aid);
                 partCountMap.put(aid.getOperatorDescriptorId(), locations.length);
@@ -116,40 +118,50 @@
                 }
             }
 
-            Phase1Installer p1is[] = new Phase1Installer[targetMap.size()];
-            int i = 0;
             for (String nid : targetMap.keySet()) {
-                p1is[i] = new Phase1Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa.getJobStage()
-                        .getId(), jsa.getJobAttempt().getAttempt(), targetMap.get(nid), partCountMap);
-                ++i;
+                ccs.getNodeMap().get(nid).getActiveJobIds().add(jobId);
             }
-            LOGGER.info("Stage start - Phase 1");
-            try {
-                Map<PortInstanceId, Endpoint> globalPortMap = RemoteRunner.runRemote(ccs, p1is,
-                        new PortMapMergingAccumulator());
 
-                Phase2Installer[] p2is = new Phase2Installer[targetMap.size()];
-                Phase3Installer[] p3is = new Phase3Installer[targetMap.size()];
-                StageStarter[] ss = new StageStarter[targetMap.size()];
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    Phase1Installer p1is[] = new Phase1Installer[targetMap.size()];
+                    int i = 0;
+                    for (String nid : targetMap.keySet()) {
+                        p1is[i] = new Phase1Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
+                                .getJobStage().getId(), jsa.getJobAttempt().getAttempt(), targetMap.get(nid),
+                                partCountMap);
+                        ++i;
+                    }
+                    LOGGER.info("Stage start - Phase 1");
+                    try {
+                        Map<PortInstanceId, Endpoint> globalPortMap = RemoteRunner.runRemote(ccs, p1is,
+                                new PortMapMergingAccumulator());
 
-                i = 0;
-                for (String nid : targetMap.keySet()) {
-                    p2is[i] = new Phase2Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
-                            .getJobStage().getId(), targetMap.get(nid), partCountMap, globalPortMap);
-                    p3is[i] = new Phase3Installer(nid, plan.getJobId(), jsa.getJobStage().getId());
-                    ss[i] = new StageStarter(nid, plan.getJobId(), jsa.getJobStage().getId());
-                    ++i;
+                        Phase2Installer[] p2is = new Phase2Installer[targetMap.size()];
+                        Phase3Installer[] p3is = new Phase3Installer[targetMap.size()];
+                        StageStarter[] ss = new StageStarter[targetMap.size()];
+
+                        i = 0;
+                        for (String nid : targetMap.keySet()) {
+                            p2is[i] = new Phase2Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
+                                    .getJobStage().getId(), targetMap.get(nid), partCountMap, globalPortMap);
+                            p3is[i] = new Phase3Installer(nid, plan.getJobId(), jsa.getJobStage().getId());
+                            ss[i] = new StageStarter(nid, plan.getJobId(), jsa.getJobStage().getId());
+                            ++i;
+                        }
+                        LOGGER.info("Stage start - Phase 2");
+                        RemoteRunner.runRemote(ccs, p2is, null);
+                        LOGGER.info("Stage start - Phase 3");
+                        RemoteRunner.runRemote(ccs, p3is, null);
+                        LOGGER.info("Stage start");
+                        RemoteRunner.runRemote(ccs, ss, null);
+                        LOGGER.info("Stage started");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
                 }
-                LOGGER.info("Stage start - Phase 2");
-                RemoteRunner.runRemote(ccs, p2is, null);
-                LOGGER.info("Stage start - Phase 3");
-                RemoteRunner.runRemote(ccs, p3is, null);
-                LOGGER.info("Stage start");
-                RemoteRunner.runRemote(ccs, ss, null);
-                LOGGER.info("Stage started");
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            });
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java
new file mode 100644
index 0000000..25378c6
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+public class FutureValue {
+    private boolean done;
+
+    private Object value;
+
+    private Exception e;
+
+    public FutureValue() {
+        done = false;
+        value = null;
+        e = null;
+    }
+
+    public synchronized void setValue(Object value) {
+        done = true;
+        this.value = value;
+        e = null;
+        notifyAll();
+    }
+
+    public synchronized void setException(Exception e) {
+        done = true;
+        this.e = e;
+        value = null;
+        notifyAll();
+    }
+
+    public synchronized void reset() {
+        done = false;
+        value = null;
+        e = null;
+        notifyAll();
+    }
+
+    public synchronized Object get() throws Exception {
+        while (!done) {
+            wait();
+        }
+        if (e != null) {
+            throw e;
+        }
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
index 168f1c2..9bd4375 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
@@ -28,8 +28,6 @@
         final Semaphore installComplete = new Semaphore(remoteOps.length);
         final List<Exception> errors = new Vector<Exception>();
         for (final RemoteOp<T> remoteOp : remoteOps) {
-            System.err.println(ccs.getNodeMap());
-            System.err.println(remoteOp.getNodeId());
             NodeControllerState nodeState = ccs.getNodeMap().get(remoteOp.getNodeId());
             final INodeController node = nodeState.getNodeController();
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
index 89ebe02..f40e8cd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
@@ -54,6 +54,9 @@
 
     private ISchedule computeSchedule(JobStageAttempt jsa, Set<OperatorDescriptorId> operators) throws HyracksException {
         Set<String> nodeSet = ccs.getNodeMap().keySet();
+        if (nodeSet.isEmpty()) {
+            throw new HyracksException("0 usable nodes found");
+        }
         String[] liveNodes = ccs.getNodeMap().keySet().toArray(new String[nodeSet.size()]);
         JobAttempt ja = jsa.getJobAttempt();
         final JobAttemptState jas = (JobAttemptState) ja.getSchedulerState();
@@ -101,6 +104,9 @@
                                         }
                                     }
                                 }
+                                if (unassignedPartsIds.get(part)) {
+                                    throw new HyracksException("Unsatisfiable constraint for operator: " + oid);
+                                }
                             }
                         }
                     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2c262e5..2335c4e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -561,7 +561,7 @@
 
     @Override
     public synchronized void abortJoblet(UUID jobId) throws Exception {
-        Joblet ji = jobletMap.get(jobId);
+        Joblet ji = jobletMap.remove(jobId);
         if (ji != null) {
             for (Stagelet stagelet : ji.getStageletMap().values()) {
                 stagelet.abort();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 858ff6c..2bb1d80 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -43,7 +43,6 @@
         CCConfig ccConfig = new CCConfig();
         ccConfig.port = 39001;
         ccConfig.profileDumpPeriod = 1000;
-        ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 121e126..b2eccdf 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -1,8 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.examples.tpch.client;
 
 import java.io.File;
 import java.util.UUID;
 
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -30,51 +47,80 @@
 import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 
 public class Main {
-    public static void main(String[] args) throws Exception {
-        String appName = args[0];
-        String host;
-        int port = 1099;
-        switch (args.length) {
-            case 3:
-                port = Integer.parseInt(args[2]);
-            case 2:
-                host = args[1];
-                break;
-            default:
-                System.err.println("One or Two arguments expected: <cchost> [<ccport>]");
-                return;
-        }
-        IHyracksClientConnection hcc = new HyracksRMIConnection(host, port);
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
 
-        JobSpecification job = createJob();
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false)
+        public int port = 1099;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
+        public String inFileCustomerSplits;
+
+        @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+        public String inFileOrderSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
+        public int numJoinPartitions = 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+                options.numJoinPartitions);
 
         long start = System.currentTimeMillis();
-        UUID jobId = hcc.createJob(appName, job);
+        UUID jobId = hcc.createJob(options.app, job);
         hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
 
-    private static JobSpecification createJob() {
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+        }
+        return fSplits;
+    }
+
+    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+            FileSplit[] resultSplits, int numJoinPartitions) {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] custSplits = createCustomerFileSplits();
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileSplit[] ordersSplits = createOrdersFileSplits();
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -99,7 +145,7 @@
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        createRRPartitionConstraint(spec, ordScanner, 2);
+        createPartitionConstraint(spec, ordScanner, orderSplits);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
                 new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
@@ -107,13 +153,13 @@
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        createRRPartitionConstraint(spec, custScanner, 2);
+        createPartitionConstraint(spec, custScanner, customerSplits);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
                 new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
                 6000000);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 4);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
         RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
@@ -126,10 +172,11 @@
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
                 groupResultDesc, 16);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, gby, 4);
+        createPartitionConstraint(spec, gby, resultSplits);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, printer, 4);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        createPartitionConstraint(spec, writer, resultSplits);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
@@ -147,44 +194,17 @@
         spec.connect(joinGroupConn, join, 0, gby, 0);
 
         IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(gbyPrinterConn, gby, 0, printer, 0);
+        spec.connect(gbyPrinterConn, gby, 0, writer, 0);
 
-        spec.addRoot(printer);
+        spec.addRoot(writer);
         return spec;
     }
 
-    private static FileSplit[] createOrdersFileSplits() {
-        FileSplit fss[] = new FileSplit[2];
-        for (int i = 0; i < fss.length; ++i) {
-            fss[i] = new FileSplit("foo", new FileReference(new File("data/tpch0.001/orders-part" + (i + 1) + ".tbl")));
+    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+        String[] parts = new String[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            parts[i] = splits[i].getNodeName();
         }
-        return fss;
-    }
-
-    private static FileSplit[] createCustomerFileSplits() {
-        FileSplit fss[] = new FileSplit[2];
-        for (int i = 0; i < fss.length; ++i) {
-            fss[i] = new FileSplit("foo",
-                    new FileReference(new File("data/tpch0.001/customer-part" + (i + 1) + ".tbl")));
-        }
-        return fss;
-    }
-
-    private static final String[] NODES = { "NC1", "NC2" };
-
-    private static void createRRPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, int nChoices) {
-        String[][] choices = new String[2][];
-        for (int i = 0; i < choices.length; ++i) {
-            choices[i] = createRRSteppedChoiceConstraint(i, nChoices);
-        }
-        PartitionConstraintHelper.addLocationChoiceConstraint(spec, op, choices);
-    }
-
-    private static String[] createRRSteppedChoiceConstraint(int index, int nChoices) {
-        String[] lcs = new String[nChoices];
-        for (int i = 0; i < nChoices; ++i) {
-            lcs[i] = NODES[(index + i) % NODES.length];
-        }
-        return lcs;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
     }
 }
\ No newline at end of file