Merged r497:500 from trunk

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@501 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index 1a7783d..5fd9a06 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-api</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
diff --git a/hyracks-cli/pom.xml b/hyracks-cli/pom.xml
index a5ce7fd..6fd6160 100644
--- a/hyracks-cli/pom.xml
+++ b/hyracks-cli/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-cli</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -89,7 +89,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index ba425cf..2fd7dd1 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-cc</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,7 +27,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e0c3ee1..583d791 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,7 +60,6 @@
 import edu.uci.ics.hyracks.control.cc.job.manager.events.UnregisterNodeEvent;
 import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
 import edu.uci.ics.hyracks.control.cc.jobqueue.JobQueue;
-import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -99,8 +98,6 @@
 
     private final JobQueue jobQueue;
 
-    private final JobScheduler scheduler;
-
     private final Executor taskExecutor;
 
     private final Timer timer;
@@ -120,7 +117,6 @@
         webServer = new WebServer(this);
         runMap = new HashMap<UUID, JobRun>();
         jobQueue = new JobQueue();
-        scheduler = new JobScheduler(this);
         this.timer = new Timer(true);
         ccci = new CCClientInterface(this);
         ccContext = new ICCContext() {
@@ -164,10 +160,6 @@
         return jobQueue;
     }
 
-    public JobScheduler getScheduler() {
-        return scheduler;
-    }
-
     public Executor getExecutor() {
         return taskExecutor;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index be4945d..f7f3cb1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -23,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.cc.scheduler.ActivityClusterStateMachine;
 
 public class ActivityCluster {
     private final JobRun jobRun;
@@ -40,8 +39,6 @@
 
     private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
 
-    private ActivityClusterStateMachine acsm;
-
     private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
 
     private Set<TaskCluster> inProgressTaskClusters;
@@ -72,6 +69,10 @@
         return dependencies;
     }
 
+    public Set<ActivityCluster> getDependents() {
+        return dependents;
+    }
+
     public Map<ActivityId, Task[]> getTaskMap() {
         return taskStateMap;
     }
@@ -88,14 +89,6 @@
         return partitionProducingTaskClusterMap;
     }
 
-    public ActivityClusterStateMachine getStateMachine() {
-        return acsm;
-    }
-
-    public void setStateMachine(ActivityClusterStateMachine acsm) {
-        this.acsm = acsm;
-    }
-
     public JobRun getJobRun() {
         return jobRun;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index b16b4c1..d4a9283 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
-import edu.uci.ics.hyracks.control.cc.scheduler.JobRunStateMachine;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
 public class JobRun implements IJobStatusConditionVariable {
@@ -39,9 +39,11 @@
 
     private final JobProfile profile;
 
+    private Set<ActivityCluster> activityClusters;
+
     private final Map<ActivityId, ActivityCluster> activityClusterMap;
 
-    private JobRunStateMachine jsm;
+    private JobScheduler js;
 
     private JobStatus status;
 
@@ -100,19 +102,23 @@
         return profile;
     }
 
-    public void setStateMachine(JobRunStateMachine jsm) {
-        this.jsm = jsm;
+    public void setScheduler(JobScheduler js) {
+        this.js = js;
     }
 
-    public JobRunStateMachine getStateMachine() {
-        return jsm;
+    public JobScheduler getScheduler() {
+        return js;
     }
 
     public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
         return activityClusterMap;
     }
 
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
-        jsm.notifyNodeFailures(deadNodes);
+    public Set<ActivityCluster> getActivityClusters() {
+        return activityClusters;
+    }
+
+    public void setActivityClusters(Set<ActivityCluster> activityClusters) {
+        this.activityClusters = activityClusters;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index f17ac12..22f5587 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.control.cc.job;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 public class TaskAttempt {
     public enum TaskStatus {
@@ -76,12 +75,4 @@
         this.status = status;
         this.exception = exception;
     }
-
-    public void notifyTaskComplete() throws HyracksException {
-        taskState.getTaskCluster().notifyTaskComplete(this);
-    }
-
-    public void notifyTaskFailure(Exception exception) throws HyracksException {
-        taskState.getTaskCluster().notifyTaskFailure(this, exception);
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index a0b2b43..ae2533e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -20,11 +20,10 @@
 import java.util.List;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class TaskCluster {
-    private final ActivityCluster activityCluster;
+    private final ActivityCluster ac;
 
     private final Task[] tasks;
 
@@ -34,14 +33,18 @@
 
     private final List<TaskClusterAttempt> taskClusterAttempts;
 
-    public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
-        this.activityCluster = activityCluster;
+    public TaskCluster(ActivityCluster ac, Task[] tasks) {
+        this.ac = ac;
         this.tasks = tasks;
         this.producedPartitions = new HashSet<PartitionId>();
         this.requiredPartitions = new HashSet<PartitionId>();
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
     }
 
+    public ActivityCluster getActivityCluster() {
+        return ac;
+    }
+
     public Task[] getTasks() {
         return tasks;
     }
@@ -58,14 +61,6 @@
         return taskClusterAttempts;
     }
 
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
-        activityCluster.getStateMachine().notifyTaskComplete(ta);
-    }
-
-    public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
-        activityCluster.getStateMachine().notifyTaskFailure(ta, exception);
-    }
-
     @Override
     public String toString() {
         return "TC:" + Arrays.toString(tasks);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 603623b..3b49db3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
 import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 
 public class JobCreateEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
@@ -70,7 +71,8 @@
         run.setStatus(JobStatus.INITIALIZED, null);
 
         ccs.getRunMap().put(jobId, run);
-        ccs.getScheduler().notifyJobCreation(run);
+        JobScheduler jrs = new JobScheduler(ccs, run);
+        run.setScheduler(jrs);
         appCtx.notifyJobCreation(jobId, spec);
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
index 5fd70b4..64515a1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
@@ -40,6 +40,10 @@
             throw new Exception("Job already started");
         }
         run.setStatus(JobStatus.RUNNING, null);
-        run.getStateMachine().schedule();
+        try {
+            run.getScheduler().startJob();
+        } catch (Exception e) {
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, run.getJobId(), JobStatus.FAILURE, e));
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 27322f5..7194a20 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -70,7 +70,7 @@
             JobRun run = ccs.getRunMap().get(jobId);
             if (run != null) {
                 try {
-                    run.notifyNodeFailures(deadNodes);
+                    run.getScheduler().notifyNodeFailures(deadNodes);
                 } catch (HyracksException e) {
                     throw new RuntimeException(e);
                 }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index 29a341f..0d33c21 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskCompleteEvent extends AbstractTaskLifecycleEvent {
@@ -29,7 +30,8 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         try {
-            ta.notifyTaskComplete();
+            ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
+            ac.getJobRun().getScheduler().notifyTaskComplete(ta, ac);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index bd6e71e..9654417 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskFailureEvent extends AbstractTaskLifecycleEvent {
@@ -33,7 +34,8 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         try {
-            ta.notifyTaskFailure(exception);
+            ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
+            ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, exception);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
index 704ab51..81883d67 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -78,7 +78,7 @@
         return null;
     }
 
-    public ActivityCluster inferStages(JobActivityGraph jag) {
+    public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
         JobSpecification spec = jag.getJobSpecification();
 
         /*
@@ -106,11 +106,8 @@
             }
         }
 
-        ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
         Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
         for (ActivityCluster s : stages) {
-            endStage.addDependency(s);
-            s.addDependent(endStage);
             Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
             for (ActivityId t : s.getActivities()) {
                 Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
@@ -125,15 +122,21 @@
                 s.addDependent(bs);
             }
         }
+        Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
+        for (ActivityCluster s : stages) {
+            if (s.getDependents().isEmpty()) {
+                roots.add(s);
+            }
+        }
+        jobRun.setActivityClusters(stages);
         jobRun.getActivityClusterMap().putAll(stageMap);
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+            LOGGER.info("Inferred " + stages.size() + " stages");
             for (ActivityCluster s : stages) {
                 LOGGER.info(s.toString());
             }
-            LOGGER.info("SID: ENDSTAGE");
         }
-        return endStage;
+        return roots;
     }
 
     private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
deleted file mode 100644
index 47eeb06..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.Task;
-import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class ActivityClusterStateMachine {
-    private static final Logger LOGGER = Logger.getLogger(ActivityClusterStateMachine.class.getName());
-
-    private final ClusterControllerService ccs;
-
-    private final JobRunStateMachine jsm;
-
-    private final ActivityCluster ac;
-
-    public ActivityClusterStateMachine(ClusterControllerService ccs, JobRunStateMachine jsm, ActivityCluster ac) {
-        this.ccs = ccs;
-        this.jsm = jsm;
-        this.ac = ac;
-    }
-
-    public void schedule() throws HyracksException {
-        startRunnableTaskClusters();
-    }
-
-    private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
-            throws HyracksException {
-        JobRun jobRun = ac.getJobRun();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
-        Task[] tasks = tc.getTasks();
-        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-        int attempts = tcAttempts.size();
-        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
-        TaskAttempt[] taskAttempts = new TaskAttempt[tasks.length];
-        Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>();
-        for (int i = 0; i < tasks.length; ++i) {
-            Task ts = tasks[i];
-            TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
-                    tid.getPartition()), attempts), ts);
-            taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
-            locationMap.put(tid,
-                    new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
-            taskAttempts[i] = taskAttempt;
-        }
-        tcAttempt.setTaskAttempts(taskAttempts);
-        PartitionConstraintSolver solver = jsm.getSolver();
-        solver.solve(locationMap.values());
-        for (int i = 0; i < tasks.length; ++i) {
-            Task ts = tasks[i];
-            TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = taskAttempts[i];
-            ActivityId aid = tid.getActivityId();
-            Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
-            String nodeId = null;
-            if (blockers != null) {
-                for (ActivityId blocker : blockers) {
-                    nodeId = findLocationOfBlocker(jobRun, jag, new TaskId(blocker, tid.getPartition()));
-                    if (nodeId != null) {
-                        break;
-                    }
-                }
-            }
-            Set<String> liveNodes = ccs.getNodeMap().keySet();
-            if (nodeId == null) {
-                LValueConstraintExpression pLocationExpr = locationMap.get(tid);
-                Object location = solver.getValue(pLocationExpr);
-                if (location == null) {
-                    // pick any
-                    nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
-                            % liveNodes.size()];
-                } else if (location instanceof String) {
-                    nodeId = (String) location;
-                } else if (location instanceof String[]) {
-                    for (String choice : (String[]) location) {
-                        if (liveNodes.contains(choice)) {
-                            nodeId = choice;
-                            break;
-                        }
-                    }
-                    if (nodeId == null) {
-                        throw new HyracksException("No satisfiable location found for "
-                                + taskAttempt.getTaskAttemptId());
-                    }
-                } else {
-                    throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
-                            + location.getClass() + ")");
-                }
-            }
-            if (nodeId == null) {
-                throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
-            }
-            if (!liveNodes.contains(nodeId)) {
-                throw new HyracksException("Node " + nodeId + " not live");
-            }
-            taskAttempt.setNodeId(nodeId);
-            taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
-            List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
-            if (tads == null) {
-                tads = new ArrayList<TaskAttemptDescriptor>();
-                taskAttemptMap.put(nodeId, tads);
-            }
-            ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
-            tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
-                    .getInputPartitionCounts(), apd.getOutputPartitionCounts()));
-        }
-        tcAttempt.initializePendingTaskCounter();
-        tcAttempts.add(tcAttempt);
-        tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
-        ac.getInProgressTaskClusters().add(tc);
-    }
-
-    private String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
-        ActivityId blockerAID = tid.getActivityId();
-        ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
-        Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
-        List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
-        if (tcAttempts == null || tcAttempts.isEmpty()) {
-            return null;
-        }
-        TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
-        for (TaskAttempt ta : lastTCA.getTaskAttempts()) {
-            TaskId blockerTID = ta.getTaskAttemptId().getTaskId();
-            if (tid.equals(blockerTID)) {
-                return ta.getNodeId();
-            }
-        }
-        return null;
-    }
-
-    private TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
-        List<TaskClusterAttempt> attempts = tc.getAttempts();
-        if (!attempts.isEmpty()) {
-            return attempts.get(attempts.size() - 1);
-        }
-        return null;
-    }
-
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTaskState().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-            TaskAttempt.TaskStatus taStatus = ta.getStatus();
-            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-                if (lastAttempt.decrementPendingTasksCounter() == 0) {
-                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-                    ac.getInProgressTaskClusters().remove(tc);
-                    startRunnableTaskClusters();
-                }
-            } else {
-                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
-            }
-        } else {
-            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
-        }
-    }
-
-    private void startRunnableTaskClusters() throws HyracksException {
-        PriorityQueue<RankedRunnableTaskCluster> queue = findRunnableTaskClusters();
-        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        for (RankedRunnableTaskCluster rrtc : queue) {
-            TaskCluster tc = rrtc.getTaskCluster();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Found runnable TC: " + tc);
-                List<TaskClusterAttempt> attempts = tc.getAttempts();
-                LOGGER.info("Attempts so far:" + attempts.size());
-                for (TaskClusterAttempt tcAttempt : attempts) {
-                    LOGGER.info("Status: " + tcAttempt.getStatus());
-                }
-            }
-            try {
-                assignTaskLocations(tc, taskAttemptMap);
-            } catch (HyracksException e) {
-                abort();
-                ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
-                return;
-            }
-        }
-
-        if (taskAttemptMap.isEmpty()) {
-            if (ac.getInProgressTaskClusters().isEmpty()) {
-                jsm.notifyActivityClusterComplete(ac);
-            }
-            return;
-        }
-
-        startTasks(taskAttemptMap);
-    }
-
-    private PriorityQueue<RankedRunnableTaskCluster> findRunnableTaskClusters() {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-
-        Map<TaskCluster, Integer> runnabilityRanks = new HashMap<TaskCluster, Integer>();
-        for (TaskCluster tc : taskClusters) {
-            // Start search at TCs that produce no outputs (sinks)
-            if (tc.getProducedPartitions().isEmpty()) {
-                assignRunnabilityRank(tc, runnabilityRanks);
-            }
-        }
-
-        PriorityQueue<RankedRunnableTaskCluster> result = new PriorityQueue<RankedRunnableTaskCluster>();
-        for (Map.Entry<TaskCluster, Integer> e : runnabilityRanks.entrySet()) {
-            TaskCluster tc = e.getKey();
-            int rank = e.getValue();
-            if (rank >= 0 && rank < Integer.MAX_VALUE) {
-                result.add(new RankedRunnableTaskCluster(rank, tc));
-            }
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ranked TCs: " + result);
-        }
-        return result;
-    }
-
-    /*
-     * Runnability rank has the following semantics
-     * Rank(Running TaskCluster || Completed TaskCluster) = -1
-     * Rank(Runnable TaskCluster depending on completed TaskClusters) = 0
-     * Rank(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
-     * Rank(Non-schedulable TaskCluster) = MAX_VALUE 
-     */
-    private int assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Integer> runnabilityRank) {
-        if (runnabilityRank.containsKey(goal)) {
-            return runnabilityRank.get(goal);
-        }
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
-        if (lastAttempt != null) {
-            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
-                    || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                int rank = -1;
-                runnabilityRank.put(goal, rank);
-                return rank;
-            }
-        }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
-        JobRun jobRun = ac.getJobRun();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        int maxInputRank = -1;
-        for (PartitionId pid : goal.getRequiredPartitions()) {
-            int rank = -1;
-            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
-            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
-            PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (PartitionState.COMMITTED.equals(maxState)
-                    || (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish())) {
-                rank = -1;
-            } else {
-                rank = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityRank);
-                if (rank >= 0 && cPolicy.consumerWaitsForProducerToFinish()) {
-                    rank = Integer.MAX_VALUE;
-                }
-            }
-            maxInputRank = Math.max(maxInputRank, rank);
-        }
-        int rank = maxInputRank < Integer.MAX_VALUE ? maxInputRank + 1 : Integer.MAX_VALUE;
-        runnabilityRank.put(goal, rank);
-        return rank;
-    }
-
-    private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
-        Executor executor = ccs.getExecutor();
-        JobRun jobRun = ac.getJobRun();
-        final UUID jobId = jobRun.getJobId();
-        final JobActivityGraph jag = jobRun.getJobActivityGraph();
-        final String appName = jag.getApplicationName();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
-            String nodeId = e.getKey();
-            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
-            final NodeControllerState node = ccs.getNodeMap().get(nodeId);
-            if (node != null) {
-                node.getActiveJobIds().add(jobRun.getJobId());
-                jobRun.getParticipatingNodeIds().add(nodeId);
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors, connectorPolicies, null);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-            }
-        }
-    }
-
-    private void abortTaskCluster(TaskClusterAttempt tcAttempt) throws HyracksException {
-        Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
-        Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
-        for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
-            TaskAttemptId taId = ta.getTaskAttemptId();
-            abortTaskIds.add(taId);
-            if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
-                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
-                if (abortTaskAttempts == null) {
-                    abortTaskAttempts = new ArrayList<TaskAttemptId>();
-                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
-                }
-                abortTaskAttempts.add(taId);
-            }
-        }
-        JobRun jobRun = ac.getJobRun();
-        final UUID jobId = jobRun.getJobId();
-        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
-            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
-            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
-            if (node != null) {
-                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
-                ccs.getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-            }
-        }
-        ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
-        TaskCluster tc = tcAttempt.getTaskCluster();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
-        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
-    }
-
-    public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTaskState().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-            TaskAttempt.TaskStatus taStatus = ta.getStatus();
-            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
-                abortTaskCluster(lastAttempt);
-                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                abortDoomedTaskClusters();
-                notifyTaskClusterFailure(lastAttempt, exception);
-            } else {
-                LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
-            }
-        } else {
-            LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
-        }
-    }
-
-    private void abortDoomedTaskClusters() throws HyracksException {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-
-        Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
-        for (TaskCluster tc : taskClusters) {
-            // Start search at TCs that produce no outputs (sinks)
-            if (tc.getProducedPartitions().isEmpty()) {
-                findDoomedTaskClusters(tc, doomedTaskClusters);
-            }
-        }
-
-        for (TaskCluster tc : doomedTaskClusters) {
-            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-            abortTaskCluster(tca);
-            tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
-        }
-    }
-
-    private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
-        if (doomedTaskClusters.contains(tc)) {
-            return true;
-        }
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt != null) {
-            switch (lastAttempt.getStatus()) {
-                case ABORTED:
-                case FAILED:
-                    return true;
-
-                case COMPLETED:
-                    return false;
-            }
-        }
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
-        JobRun jobRun = ac.getJobRun();
-        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        boolean doomed = false;
-        for (PartitionId pid : tc.getRequiredPartitions()) {
-            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
-            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
-            PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (maxState == null
-                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
-                if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), doomedTaskClusters)) {
-                    doomed = true;
-                }
-            }
-        }
-        if (doomed) {
-            doomedTaskClusters.add(tc);
-        }
-        return doomed;
-    }
-
-    public void abort() throws HyracksException {
-        TaskCluster[] taskClusters = ac.getTaskClusters();
-        for (TaskCluster tc : taskClusters) {
-            List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
-            if (!tcAttempts.isEmpty()) {
-                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
-                if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
-                    abortTaskCluster(tcAttempt);
-                    tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
-                }
-            }
-        }
-    }
-
-    public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
-        TaskCluster tc = tcAttempt.getTaskCluster();
-        if (tcAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
-            abort();
-            ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, exception);
-            return;
-        }
-        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
-        try {
-            assignTaskLocations(tc, taskAttemptMap);
-        } catch (HyracksException e) {
-            abort();
-            ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
-            return;
-        }
-        startTasks(taskAttemptMap);
-    }
-
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
-        for (TaskCluster tc : ac.getTaskClusters()) {
-            TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
-            if (lastTaskClusterAttempt != null
-                    && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
-                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                boolean abort = false;
-                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
-                    if (deadNodes.contains(ta.getNodeId())) {
-                        ta.setStatus(TaskAttempt.TaskStatus.FAILED, new HyracksException("Node " + ta.getNodeId()
-                                + " failed"));
-                        abort = true;
-                    }
-                }
-                if (abort) {
-                    abortTaskCluster(lastTaskClusterAttempt);
-                }
-            }
-        }
-        abortDoomedTaskClusters();
-    }
-
-    public boolean canMakeProgress() {
-        return true;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
deleted file mode 100644
index ce2770a..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
-
-public class JobRunStateMachine {
-    private final ClusterControllerService ccs;
-
-    private final JobRun jobRun;
-
-    private final Set<ActivityCluster> completedClusters;
-
-    private final Set<ActivityCluster> inProgressClusters;
-
-    private PartitionConstraintSolver solver;
-
-    private ActivityCluster rootActivityCluster;
-
-    public JobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
-        this.ccs = ccs;
-        this.jobRun = jobRun;
-        completedClusters = new HashSet<ActivityCluster>();
-        inProgressClusters = new HashSet<ActivityCluster>();
-    }
-
-    public PartitionConstraintSolver getSolver() {
-        return solver;
-    }
-
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
-        if (completedClusters.contains(candidate) || frontier.contains(candidate)
-                || inProgressClusters.contains(candidate)) {
-            return;
-        }
-        boolean runnable = true;
-        for (ActivityCluster s : candidate.getDependencies()) {
-            if (!completedClusters.contains(s)) {
-                runnable = false;
-                findRunnableActivityClusters(frontier, s);
-            }
-        }
-        if (runnable && candidate != rootActivityCluster) {
-            frontier.add(candidate);
-        }
-    }
-
-    private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
-        findRunnableActivityClusters(frontier, rootActivityCluster);
-    }
-
-    public void schedule() throws HyracksException {
-        try {
-            solver = new PartitionConstraintSolver();
-            final JobActivityGraph jag = jobRun.getJobActivityGraph();
-            final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
-            JobSpecification spec = jag.getJobSpecification();
-            final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
-            final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
-                @Override
-                public void addConstraint(Constraint constraint) {
-                    contributedConstraints.add(constraint);
-                }
-            };
-            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-                @Override
-                public void visit(IOperatorDescriptor op) {
-                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
-                @Override
-                public void visit(IConnectorDescriptor conn) {
-                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
-                }
-            });
-            contributedConstraints.addAll(spec.getUserConstraints());
-            solver.addConstraints(contributedConstraints);
-
-            ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
-            rootActivityCluster = acgb.inferStages(jag);
-            startRunnableActivityClusters();
-        } catch (Exception e) {
-            e.printStackTrace();
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
-            throw new HyracksException(e);
-        }
-    }
-
-    private void startRunnableActivityClusters() throws HyracksException {
-        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
-        findRunnableActivityClusters(runnableClusters);
-        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
-            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
-            return;
-        }
-        for (ActivityCluster ac : runnableClusters) {
-            inProgressClusters.add(ac);
-            TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
-            tcb.buildTaskClusters(ac);
-            ActivityClusterStateMachine acsm = new ActivityClusterStateMachine(ccs, this, ac);
-            ac.setStateMachine(acsm);
-            acsm.schedule();
-        }
-    }
-
-    public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
-        for (ActivityCluster ac2 : inProgressClusters) {
-            abortActivityCluster(ac2);
-        }
-        jobRun.setStatus(JobStatus.FAILURE, exception);
-    }
-
-    private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
-        ac.getStateMachine().abort();
-    }
-
-    public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
-        completedClusters.add(ac);
-        inProgressClusters.remove(ac);
-        startRunnableActivityClusters();
-    }
-
-    public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
-        jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
-        for (ActivityCluster ac : completedClusters) {
-            ac.getStateMachine().notifyNodeFailures(deadNodes);
-        }
-        for (ActivityCluster ac : inProgressClusters) {
-            ac.getStateMachine().notifyNodeFailures(deadNodes);
-        }
-        for (ActivityCluster ac : inProgressClusters) {
-            ActivityClusterStateMachine acsm = ac.getStateMachine();
-            if (acsm.canMakeProgress()) {
-                acsm.schedule();
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 44aa1b8..14f5d5a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -14,19 +14,655 @@
  */
 package edu.uci.ics.hyracks.control.cc.scheduler;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
+import edu.uci.ics.hyracks.control.cc.job.Task;
+import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public class JobScheduler {
+    private static final Logger LOGGER = Logger.getLogger(JobScheduler.class.getName());
+
     private final ClusterControllerService ccs;
 
-    public JobScheduler(ClusterControllerService ccs) {
+    private final JobRun jobRun;
+
+    private final Set<ActivityCluster> completedClusters;
+
+    private final Set<ActivityCluster> inProgressClusters;
+
+    private final PartitionConstraintSolver solver;
+
+    private Set<ActivityCluster> rootActivityClusters;
+
+    public JobScheduler(ClusterControllerService ccs, JobRun jobRun) {
         this.ccs = ccs;
+        this.jobRun = jobRun;
+        completedClusters = new HashSet<ActivityCluster>();
+        inProgressClusters = new HashSet<ActivityCluster>();
+        solver = new PartitionConstraintSolver();
     }
 
-    public void notifyJobCreation(JobRun run) throws HyracksException {
-        JobRunStateMachine jsm = new JobRunStateMachine(ccs, run);
-        run.setStateMachine(jsm);
+    public void startJob() throws HyracksException {
+        analyze();
+        startRunnableActivityClusters();
+    }
+
+    private void analyze() throws HyracksException {
+        final JobActivityGraph jag = jobRun.getJobActivityGraph();
+        final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
+        JobSpecification spec = jag.getJobSpecification();
+        final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
+        final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+            @Override
+            public void addConstraint(Constraint constraint) {
+                contributedConstraints.add(constraint);
+            }
+        };
+        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) {
+                op.contributeSchedulingConstraints(acceptor, jag, appCtx);
+            }
+        });
+        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+            @Override
+            public void visit(IConnectorDescriptor conn) {
+                conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
+            }
+        });
+        contributedConstraints.addAll(spec.getUserConstraints());
+        solver.addConstraints(contributedConstraints);
+
+        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
+        rootActivityClusters = acgb.inferActivityClusters(jag);
+    }
+
+    private void findPrerequisiteActivities(Set<ActivityId> prereqs, ActivityCluster ac) {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+        if (taskClusters == null) {
+            JobActivityGraph jag = jobRun.getJobActivityGraph();
+            for (ActivityId aid : ac.getActivities()) {
+                Set<ActivityId> deps = jag.getBlocked2BlockerMap().get(aid);
+                prereqs.addAll(deps);
+            }
+        } else {
+
+        }
+    }
+
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, Set<ActivityCluster> roots) {
+        for (ActivityCluster root : roots) {
+            findRunnableActivityClusters(frontier, root);
+        }
+    }
+
+    private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
+        if (frontier.contains(candidate) || inProgressClusters.contains(candidate)) {
+            return;
+        }
+        boolean depsComplete = true;
+        Set<ActivityId> prereqs = new HashSet<ActivityId>();
+        findPrerequisiteActivities(prereqs, candidate);
+        Map<ActivityCluster, Set<ActivityId>> prereqACMap = new HashMap<ActivityCluster, Set<ActivityId>>();
+        for (ActivityId aid : prereqs) {
+            ActivityCluster ac = jobRun.getActivityClusterMap().get(aid);
+            Set<ActivityId> pSet = prereqACMap.get(ac);
+            if (pSet == null) {
+                pSet = new HashSet<ActivityId>();
+                prereqACMap.put(ac, pSet);
+            }
+            pSet.add(aid);
+        }
+        for (ActivityCluster depAC : candidate.getDependencies()) {
+            if (!completedClusters.contains(depAC)) {
+                depsComplete = false;
+                findRunnableActivityClusters(frontier, depAC);
+            } else {
+                Set<ActivityId> pSet = prereqACMap.get(depAC);
+                if (pSet != null) {
+
+                }
+                // TODO
+            }
+        }
+        if (depsComplete) {
+            if (runnable && candidate != rootActivityCluster) {
+                frontier.add(candidate);
+            }
+
+        }
+    }
+
+    private void startRunnableActivityClusters() throws HyracksException {
+        Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
+        findRunnableActivityClusters(runnableClusters, rootActivityClusters);
+        if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
+            ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+            return;
+        }
+        for (ActivityCluster ac : runnableClusters) {
+            inProgressClusters.add(ac);
+            TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
+            tcb.buildTaskClusters(ac);
+            startRunnableTaskClusters(ac);
+        }
+    }
+
+    private void abortActivityCluster(ActivityCluster ac) {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+        for (TaskCluster tc : taskClusters) {
+            List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+            if (!tcAttempts.isEmpty()) {
+                TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
+                if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                    abortTaskCluster(tcAttempt, ac);
+                    tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                }
+            }
+        }
+        inProgressClusters.remove(ac);
+    }
+
+    private void assignTaskLocations(ActivityCluster ac, TaskCluster tc,
+            Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
+        JobRun jobRun = ac.getJobRun();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        Task[] tasks = tc.getTasks();
+        List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+        int attempts = tcAttempts.size();
+        TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
+        TaskAttempt[] taskAttempts = new TaskAttempt[tasks.length];
+        Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>();
+        for (int i = 0; i < tasks.length; ++i) {
+            Task ts = tasks[i];
+            TaskId tid = ts.getTaskId();
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
+                    tid.getPartition()), attempts), ts);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
+            locationMap.put(tid,
+                    new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
+            taskAttempts[i] = taskAttempt;
+        }
+        tcAttempt.setTaskAttempts(taskAttempts);
+        solver.solve(locationMap.values());
+        for (int i = 0; i < tasks.length; ++i) {
+            Task ts = tasks[i];
+            TaskId tid = ts.getTaskId();
+            TaskAttempt taskAttempt = taskAttempts[i];
+            ActivityId aid = tid.getActivityId();
+            Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
+            String nodeId = null;
+            if (blockers != null) {
+                for (ActivityId blocker : blockers) {
+                    nodeId = findLocationOfBlocker(jobRun, jag, new TaskId(blocker, tid.getPartition()));
+                    if (nodeId != null) {
+                        break;
+                    }
+                }
+            }
+            Set<String> liveNodes = ccs.getNodeMap().keySet();
+            if (nodeId == null) {
+                LValueConstraintExpression pLocationExpr = locationMap.get(tid);
+                Object location = solver.getValue(pLocationExpr);
+                if (location == null) {
+                    // pick any
+                    nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
+                            % liveNodes.size()];
+                } else if (location instanceof String) {
+                    nodeId = (String) location;
+                } else if (location instanceof String[]) {
+                    for (String choice : (String[]) location) {
+                        if (liveNodes.contains(choice)) {
+                            nodeId = choice;
+                            break;
+                        }
+                    }
+                    if (nodeId == null) {
+                        throw new HyracksException("No satisfiable location found for "
+                                + taskAttempt.getTaskAttemptId());
+                    }
+                } else {
+                    throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
+                            + location.getClass() + ")");
+                }
+            }
+            if (nodeId == null) {
+                throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
+            }
+            if (!liveNodes.contains(nodeId)) {
+                throw new HyracksException("Node " + nodeId + " not live");
+            }
+            taskAttempt.setNodeId(nodeId);
+            taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
+            List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
+            if (tads == null) {
+                tads = new ArrayList<TaskAttemptDescriptor>();
+                taskAttemptMap.put(nodeId, tads);
+            }
+            ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
+            tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
+                    .getInputPartitionCounts(), apd.getOutputPartitionCounts()));
+        }
+        tcAttempt.initializePendingTaskCounter();
+        tcAttempts.add(tcAttempt);
+        tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
+        ac.getInProgressTaskClusters().add(tc);
+    }
+
+    private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
+        ActivityId blockerAID = tid.getActivityId();
+        ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
+        Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
+        List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
+        if (tcAttempts == null || tcAttempts.isEmpty()) {
+            return null;
+        }
+        TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
+        for (TaskAttempt ta : lastTCA.getTaskAttempts()) {
+            TaskId blockerTID = ta.getTaskAttemptId().getTaskId();
+            if (tid.equals(blockerTID)) {
+                return ta.getNodeId();
+            }
+        }
+        return null;
+    }
+
+    private static TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
+        List<TaskClusterAttempt> attempts = tc.getAttempts();
+        if (!attempts.isEmpty()) {
+            return attempts.get(attempts.size() - 1);
+        }
+        return null;
+    }
+
+    public void notifyTaskComplete(TaskAttempt ta, ActivityCluster ac) throws HyracksException {
+        TaskAttemptId taId = ta.getTaskAttemptId();
+        TaskCluster tc = ta.getTaskState().getTaskCluster();
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
+            TaskAttempt.TaskStatus taStatus = ta.getStatus();
+            if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
+                ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+                if (lastAttempt.decrementPendingTasksCounter() == 0) {
+                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                    ac.getInProgressTaskClusters().remove(tc);
+                    startRunnableTaskClusters(ac);
+                }
+            } else {
+                LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+            }
+        } else {
+            LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+        }
+    }
+
+    private void startRunnableTaskClusters(ActivityCluster ac) throws HyracksException {
+        Map<TaskCluster, Runnability> runnabilityMap = computeRunnabilityRanks(ac);
+
+        PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>();
+        for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) {
+            TaskCluster tc = e.getKey();
+            Runnability runnability = e.getValue();
+            assert runnability.getTag() != Runnability.Tag.UNSATISFIED_PREREQUISITES;
+            if (runnability.getTag() != Runnability.Tag.RUNNABLE) {
+                continue;
+            }
+            int priority = runnability.getPriority();
+            if (priority >= 0 && priority < Integer.MAX_VALUE) {
+                queue.add(new RankedRunnableTaskCluster(priority, tc));
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Ranked TCs: " + queue);
+        }
+
+        Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
+        for (RankedRunnableTaskCluster rrtc : queue) {
+            TaskCluster tc = rrtc.getTaskCluster();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Found runnable TC: " + tc);
+                List<TaskClusterAttempt> attempts = tc.getAttempts();
+                LOGGER.info("Attempts so far:" + attempts.size());
+                for (TaskClusterAttempt tcAttempt : attempts) {
+                    LOGGER.info("Status: " + tcAttempt.getStatus());
+                }
+            }
+            assignTaskLocations(ac, tc, taskAttemptMap);
+        }
+
+        if (taskAttemptMap.isEmpty()) {
+            if (ac.getInProgressTaskClusters().isEmpty()) {
+                completedClusters.add(ac);
+                inProgressClusters.remove(ac);
+                startRunnableActivityClusters();
+            }
+            return;
+        }
+
+        startTasks(ac, taskAttemptMap);
+    }
+
+    private void abortJob(Exception exception) {
+        for (ActivityCluster ac : inProgressClusters) {
+            abortActivityCluster(ac);
+        }
+        jobRun.setStatus(JobStatus.FAILURE, exception);
+    }
+
+    private Map<TaskCluster, Runnability> computeRunnabilityRanks(ActivityCluster ac) {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+
+        Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>();
+        for (TaskCluster tc : taskClusters) {
+            // Start search at TCs that produce no outputs (sinks)
+            if (tc.getProducedPartitions().isEmpty()) {
+                assignRunnabilityRank(tc, runnabilityMap, ac);
+            }
+        }
+        return runnabilityMap;
+    }
+
+    /*
+     * Runnability rank has the following semantics
+     * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0}
+     * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+     * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} 
+     */
+    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap,
+            ActivityCluster ac) {
+        if (runnabilityMap.containsKey(goal)) {
+            return runnabilityMap.get(goal);
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+        if (lastAttempt != null) {
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
+                runnabilityMap.put(goal, runnability);
+                return runnability;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+        JobRun jobRun = ac.getJobRun();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+        for (PartitionId pid : goal.getRequiredPartitions()) {
+            Runnability runnability;
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (PartitionState.COMMITTED.equals(maxState)) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
+            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
+                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
+            } else {
+                runnability = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityMap,
+                        ac);
+                if (runnability.getTag() == Runnability.Tag.RUNNABLE && runnability.getPriority() > 0
+                        && cPolicy.consumerWaitsForProducerToFinish()) {
+                    runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
+                }
+            }
+            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
+        }
+        runnabilityMap.put(goal, aggregateRunnability);
+        return aggregateRunnability;
+    }
+
+    private void startTasks(ActivityCluster ac, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
+            throws HyracksException {
+        Executor executor = ccs.getExecutor();
+        JobRun jobRun = ac.getJobRun();
+        final UUID jobId = jobRun.getJobId();
+        final JobActivityGraph jag = jobRun.getJobActivityGraph();
+        final String appName = jag.getApplicationName();
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
+            String nodeId = e.getKey();
+            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
+            final NodeControllerState node = ccs.getNodeMap().get(nodeId);
+            if (node != null) {
+                node.getActiveJobIds().add(jobRun.getJobId());
+                jobRun.getParticipatingNodeIds().add(nodeId);
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+                                    taskDescriptors, connectorPolicies, null);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        }
+    }
+
+    private void abortTaskCluster(TaskClusterAttempt tcAttempt, ActivityCluster ac) {
+        Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
+        Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
+        for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            abortTaskIds.add(taId);
+            if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
+                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
+                if (abortTaskAttempts == null) {
+                    abortTaskAttempts = new ArrayList<TaskAttemptId>();
+                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
+                }
+                abortTaskAttempts.add(taId);
+            }
+        }
+        JobRun jobRun = ac.getJobRun();
+        final UUID jobId = jobRun.getJobId();
+        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+            if (node != null) {
+                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        }
+        ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
+        TaskCluster tc = tcAttempt.getTaskCluster();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
+        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+    }
+
+    private void abortDoomedTaskClusters(ActivityCluster ac) throws HyracksException {
+        TaskCluster[] taskClusters = ac.getTaskClusters();
+
+        Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
+        for (TaskCluster tc : taskClusters) {
+            // Start search at TCs that produce no outputs (sinks)
+            if (tc.getProducedPartitions().isEmpty()) {
+                findDoomedTaskClusters(tc, ac, doomedTaskClusters);
+            }
+        }
+
+        for (TaskCluster tc : doomedTaskClusters) {
+            TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+            abortTaskCluster(tca, ac);
+            tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+        }
+    }
+
+    private boolean findDoomedTaskClusters(TaskCluster tc, ActivityCluster ac, Set<TaskCluster> doomedTaskClusters) {
+        if (doomedTaskClusters.contains(tc)) {
+            return true;
+        }
+        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+        if (lastAttempt != null) {
+            switch (lastAttempt.getStatus()) {
+                case ABORTED:
+                case FAILED:
+                    return true;
+
+                case COMPLETED:
+                    return false;
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+        JobRun jobRun = ac.getJobRun();
+        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+        boolean doomed = false;
+        for (PartitionId pid : tc.getRequiredPartitions()) {
+            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+            PartitionState maxState = pmm.getMaximumAvailableState(pid);
+            if (maxState == null
+                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
+                if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), ac, doomedTaskClusters)) {
+                    doomed = true;
+                }
+            }
+        }
+        if (doomed) {
+            doomedTaskClusters.add(tc);
+        }
+        return doomed;
+    }
+
+    /**
+     * Indicates that a single task attempt has encountered a failure.
+     * 
+     * @param ta
+     *            - Failed Task Attempt
+     * @param ac
+     *            - Activity Cluster that owns this Task
+     * @param exception
+     *            - Cause of the failure
+     */
+    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, Exception exception) {
+        try {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            TaskCluster tc = ta.getTaskState().getTaskCluster();
+            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+            if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
+                TaskAttempt.TaskStatus taStatus = ta.getStatus();
+                if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
+                    ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
+                    abortTaskCluster(lastAttempt, ac);
+                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
+                    abortDoomedTaskClusters(ac);
+                    if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
+                        abortActivityCluster(ac);
+                        return;
+                    }
+                    startRunnableTaskClusters(ac);
+                } else {
+                    LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
+                }
+            } else {
+                LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
+                        + lastAttempt);
+            }
+        } catch (Exception e) {
+            abortJob(e);
+        }
+    }
+
+    /**
+     * Indicates that the provided set of nodes have left the cluster.
+     * 
+     * @param deadNodes
+     *            - Set of failed nodes
+     */
+    public void notifyNodeFailures(Set<String> deadNodes) {
+        try {
+            jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+            for (ActivityCluster ac : jobRun.getActivityClusters()) {
+                TaskCluster[] taskClusters = ac.getTaskClusters();
+                if (taskClusters != null) {
+                    for (TaskCluster tc : taskClusters) {
+                        TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+                        if (lastTaskClusterAttempt != null
+                                && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+                                        .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+                            boolean abort = false;
+                            for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
+                                assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
+                                if (deadNodes.contains(ta.getNodeId())) {
+                                    ta.setStatus(TaskAttempt.TaskStatus.FAILED,
+                                            new HyracksException("Node " + ta.getNodeId() + " failed"));
+                                    TaskId tid = ta.getTaskAttemptId().getTaskId();
+                                    ActivityId aid = tid.getActivityId();
+
+                                    abort = true;
+                                }
+                            }
+                            if (abort) {
+                                abortTaskCluster(lastTaskClusterAttempt, ac);
+                            }
+                        }
+                    }
+                    abortDoomedTaskClusters(ac);
+                }
+            }
+            startRunnableActivityClusters();
+        } catch (Exception e) {
+            abortJob(e);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
new file mode 100644
index 0000000..f990806
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/Runnability.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+public final class Runnability {
+    private final Tag tag;
+
+    private final int priority;
+
+    public Runnability(Tag tag, int priority) {
+        this.tag = tag;
+        this.priority = priority;
+    }
+
+    public Tag getTag() {
+        return tag;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public enum Tag {
+        COMPLETED,
+        NOT_RUNNABLE,
+        RUNNABLE,
+        RUNNING,
+        UNSATISFIED_PREREQUISITES,
+    }
+
+    public static Runnability getWorstCase(Runnability r1, Runnability r2) {
+        switch (r1.tag) {
+            case COMPLETED:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case NOT_RUNNABLE:
+                    case RUNNABLE:
+                    case RUNNING:
+                    case UNSATISFIED_PREREQUISITES:
+                        return r2;
+                }
+                break;
+
+            case NOT_RUNNABLE:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case NOT_RUNNABLE:
+                    case RUNNABLE:
+                    case RUNNING:
+                        return r1;
+
+                    case UNSATISFIED_PREREQUISITES:
+                        return r2;
+                }
+                break;
+
+            case RUNNABLE:
+                switch (r2.tag) {
+                    case COMPLETED:
+                        return r1;
+
+                    case RUNNING:
+                        return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1);
+
+                    case NOT_RUNNABLE:
+                    case UNSATISFIED_PREREQUISITES:
+                        return r2;
+
+                    case RUNNABLE:
+                        return r1.priority > r2.priority ? r1 : r2;
+                }
+                break;
+
+            case RUNNING:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case RUNNING:
+                        return r1;
+
+                    case NOT_RUNNABLE:
+                    case UNSATISFIED_PREREQUISITES:
+                        return r2;
+
+                    case RUNNABLE:
+                        return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1);
+                }
+                break;
+
+            case UNSATISFIED_PREREQUISITES:
+                switch (r2.tag) {
+                    case COMPLETED:
+                    case NOT_RUNNABLE:
+                    case RUNNABLE:
+                    case RUNNING:
+                    case UNSATISFIED_PREREQUISITES:
+                        return r1;
+                }
+                break;
+        }
+        throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2);
+    }
+
+    @Override
+    public String toString() {
+        return "{" + tag + ", " + priority + "}";
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/pom.xml b/hyracks-control-common/pom.xml
index 27169cf..1c47b80 100644
--- a/hyracks-control-common/pom.xml
+++ b/hyracks-control-common/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-common</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,7 +27,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index ba375a4..0e920f7 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-nc</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -33,7 +33,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-dataflow-common/pom.xml b/hyracks-dataflow-common/pom.xml
index fda93fb..705acc8 100644
--- a/hyracks-dataflow-common/pom.xml
+++ b/hyracks-dataflow-common/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-common</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,7 +27,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-dataflow-hadoop/pom.xml b/hyracks-dataflow-hadoop/pom.xml
index da62260..250c1af 100644
--- a/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks-dataflow-hadoop/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-hadoop</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,14 +27,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
@@ -54,7 +54,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-dataflow-std/pom.xml b/hyracks-dataflow-std/pom.xml
index 3539048..fa01942 100644
--- a/hyracks-dataflow-std/pom.xml
+++ b/hyracks-dataflow-std/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-std</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,14 +27,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-documentation/pom.xml b/hyracks-documentation/pom.xml
index 07af861..6708f8e 100644
--- a/hyracks-documentation/pom.xml
+++ b/hyracks-documentation/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-documentation</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
diff --git a/hyracks-examples/btree-example/btreeapp/pom.xml b/hyracks-examples/btree-example/btreeapp/pom.xml
index 2f920f1..1668d5c 100644
--- a/hyracks-examples/btree-example/btreeapp/pom.xml
+++ b/hyracks-examples/btree-example/btreeapp/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreeapp</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>btree-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -51,7 +51,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   		<artifactId>btreehelper</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks-examples/btree-example/btreeclient/pom.xml
index 31e316a..8089e3c 100644
--- a/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -2,31 +2,31 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreeclient</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>btree-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   		<artifactId>btreehelper</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-examples/btree-example/btreehelper/pom.xml b/hyracks-examples/btree-example/btreehelper/pom.xml
index 3d25728..230fb42 100644
--- a/hyracks-examples/btree-example/btreehelper/pom.xml
+++ b/hyracks-examples/btree-example/btreehelper/pom.xml
@@ -2,31 +2,31 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreehelper</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>btree-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-examples/btree-example/pom.xml b/hyracks-examples/btree-example/pom.xml
index 45d9837..1a13d52 100644
--- a/hyracks-examples/btree-example/pom.xml
+++ b/hyracks-examples/btree-example/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>btree-example</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 9e21cff..73ea9de 100644
--- a/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompatapp</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>hadoop-compat-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -140,13 +140,13 @@
      <dependency>
         <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
         <artifactId>hadoopcompathelper</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <scope>compile</scope>
      </dependency>
      <dependency>
         <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   	    <artifactId>hadoopcompatclient</artifactId>
-  	    <version>0.1.6-SNAPSHOT</version>
+  	    <version>0.1.7-SNAPSHOT</version>
   	    <type>jar</type>
   	    <scope>test</scope>
      </dependency>
diff --git a/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml b/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
index b6f222c..9057707 100644
--- a/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
+++ b/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
@@ -2,25 +2,25 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompatclient</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>hadoop-compat-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   		<artifactId>hadoopcompathelper</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml b/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
index 49090ce..4946545 100644
--- a/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
+++ b/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
@@ -2,25 +2,25 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.compat</groupId>
   <artifactId>hadoopcompathelper</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>hadoop-compat-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-examples/hadoop-compat-example/pom.xml b/hyracks-examples/hadoop-compat-example/pom.xml
index b2a08ee..ccbe3d5 100644
--- a/hyracks-examples/hadoop-compat-example/pom.xml
+++ b/hyracks-examples/hadoop-compat-example/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>hadoop-compat-example</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
@@ -29,7 +29,7 @@
       <dependency>
          <groupId>edu.uci.ics.hyracks</groupId>
          <artifactId>hyracks-hadoop-compat</artifactId>
-         <version>0.1.6-SNAPSHOT</version>
+         <version>0.1.7-SNAPSHOT</version>
          <type>jar</type>
          <scope>compile</scope>
       </dependency>
diff --git a/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-examples/hyracks-integration-tests/pom.xml
index cd0d285..8ef1706 100644
--- a/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>hyracks-integration-tests</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -42,42 +42,42 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-cc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-invertedindex</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-test-support</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
diff --git a/hyracks-examples/pom.xml b/hyracks-examples/pom.xml
index 5281140..61bc127 100644
--- a/hyracks-examples/pom.xml
+++ b/hyracks-examples/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-examples</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/hyracks-examples/text-example/pom.xml b/hyracks-examples/text-example/pom.xml
index b7bbd8d..3f7390f 100644
--- a/hyracks-examples/text-example/pom.xml
+++ b/hyracks-examples/text-example/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>text-example</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/hyracks-examples/text-example/textapp/pom.xml b/hyracks-examples/text-example/textapp/pom.xml
index 4323a27..463d8a5 100644
--- a/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks-examples/text-example/textapp/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>textapp</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>text-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -135,13 +135,13 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.text</groupId>
   		<artifactId>texthelper</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.text</groupId>
   		<artifactId>textclient</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
diff --git a/hyracks-examples/text-example/textclient/pom.xml b/hyracks-examples/text-example/textclient/pom.xml
index b85914a..e373f0d 100644
--- a/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks-examples/text-example/textclient/pom.xml
@@ -2,25 +2,25 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>textclient</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>text-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks.examples.text</groupId>
   		<artifactId>texthelper</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-examples/text-example/texthelper/pom.xml b/hyracks-examples/text-example/texthelper/pom.xml
index 0030339..99a4c4b 100644
--- a/hyracks-examples/text-example/texthelper/pom.xml
+++ b/hyracks-examples/text-example/texthelper/pom.xml
@@ -2,25 +2,25 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>texthelper</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>text-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-examples/tpch-example/pom.xml b/hyracks-examples/tpch-example/pom.xml
index bab6832..872eead 100644
--- a/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks-examples/tpch-example/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>tpch-example</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks-examples/tpch-example/tpchapp/pom.xml
index ab879fe..a2fbf54 100644
--- a/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchapp</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -51,7 +51,7 @@
     <dependency>
         <groupId>edu.uci.ics.hyracks</groupId>
         <artifactId>hyracks-dataflow-std</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <scope>compile</scope>
     </dependency>
   </dependencies>
diff --git a/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-examples/tpch-example/tpchclient/pom.xml
index 9204326..977825f 100644
--- a/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -2,19 +2,19 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchclient</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-hadoop-compat/pom.xml b/hyracks-hadoop-compat/pom.xml
index 5be7e5d..44eb842 100644
--- a/hyracks-hadoop-compat/pom.xml
+++ b/hyracks-hadoop-compat/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-hadoop-compat</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -79,7 +79,7 @@
     <dependency>
     	<groupId>edu.uci.ics.hyracks</groupId>
     	<artifactId>hyracks-dataflow-hadoop</artifactId>
-    	<version>0.1.6-SNAPSHOT</version>
+    	<version>0.1.7-SNAPSHOT</version>
     	<type>jar</type>
     	<scope>compile</scope>
     </dependency>
diff --git a/hyracks-server/pom.xml b/hyracks-server/pom.xml
index 153aa57..4f967da 100644
--- a/hyracks-server/pom.xml
+++ b/hyracks-server/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-server</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -61,14 +61,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-cc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-storage-am-btree/pom.xml b/hyracks-storage-am-btree/pom.xml
index b75276a..c51ce70 100644
--- a/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-storage-am-btree/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-btree</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,28 +27,28 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
diff --git a/hyracks-storage-am-invertedindex/pom.xml b/hyracks-storage-am-invertedindex/pom.xml
index f79101c..78a053c 100644
--- a/hyracks-storage-am-invertedindex/pom.xml
+++ b/hyracks-storage-am-invertedindex/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-invertedindex</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,35 +27,35 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
diff --git a/hyracks-storage-common/pom.xml b/hyracks-storage-common/pom.xml
index 5f66a0e..639e1a7 100644
--- a/hyracks-storage-common/pom.xml
+++ b/hyracks-storage-common/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-common</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,7 +27,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-test-support/pom.xml b/hyracks-test-support/pom.xml
index 50ff242..bd3a9c5 100644
--- a/hyracks-test-support/pom.xml
+++ b/hyracks-test-support/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-test-support</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,19 +27,19 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-tests/hyracks-storage-am-btree-test/pom.xml b/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
index 42abe33..bd13afb 100644
--- a/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
+++ b/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-btree-test</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-tests</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -34,20 +34,20 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-test-support</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
diff --git a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index 83e9b9e..ec2625f 100644
--- a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-invertedindex-test</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-tests</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,20 +27,20 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-invertedindex</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-test-support</artifactId>
-  		<version>0.1.6-SNAPSHOT</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
diff --git a/hyracks-tests/pom.xml b/hyracks-tests/pom.xml
index e3a2807..4a391bc 100644
--- a/hyracks-tests/pom.xml
+++ b/hyracks-tests/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-tests</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/pom.xml b/pom.xml
index f38ccc8..b6cc875 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks</artifactId>
-  <version>0.1.6-SNAPSHOT</version>
+  <version>0.1.7-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <build>