Added scheduling and fault recovery. Completely removed Java based JobManager.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@20 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
new file mode 100644
index 0000000..c14345f
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class ChoiceLocationConstraint extends LocationConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private LocationConstraint[] choices;
+
+    public ChoiceLocationConstraint(LocationConstraint... choices) {
+        this.choices = choices;
+    }
+
+    public LocationConstraint[] getChoices() {
+        return choices;
+    }
+
+    @Override
+    public ConstraintType getConstraintType() {
+        return ConstraintType.CHOICE;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index e88a1b2..d34b6b2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -21,6 +21,7 @@
 
     public enum ConstraintType {
         ABSOLUTE,
+        CHOICE
     }
 
     public abstract ConstraintType getConstraintType();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index 81b16d9..6461a05 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -33,9 +33,20 @@
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
         StageletStatistics statistics) throws Exception;
 
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
     public void nodeHeartbeat(String id) throws Exception;
 
     /*
+     * Client Application Control Methods.
+     */
+    public void createApplication(String appName) throws Exception;
+
+    public void startApplication(String appName) throws Exception;
+
+    public void destroyApplication(String appName) throws Exception;
+
+    /*
      * Client Job Control methods.
      */
     public UUID createJob(JobSpecification jobSpec) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index d349edf..264254e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -21,6 +21,7 @@
 
 import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.config.NCConfig;
@@ -33,10 +34,11 @@
     public NodeCapability getNodeCapability() throws Exception;
 
     public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
-        Set<ActivityNodeId> activities) throws Exception;
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception;
 
-    public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
-        Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+    public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Map<ActivityNodeId, Set<Integer>> tasks,
+        Map<OperatorDescriptorId, Set<Integer>> opPartitions, Map<PortInstanceId, Endpoint> globalPortMap)
+        throws Exception;
 
     public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 600fd41..b3feef8 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -50,11 +50,15 @@
      *            Endpoint writer factory.
      * @param index
      *            ordinal index of the data producer partition.
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
      * @return data writer.
      * @throws Exception
      */
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException;
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this connector.
@@ -67,11 +71,15 @@
      *            Connection Demultiplexer
      * @param index
      *            ordinal index of the data consumer partition
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
      * @return data reader
      * @throws HyracksDataException
      */
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException;
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
 
     /**
      * Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 17a0fc5..ad51370 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -65,21 +65,6 @@
     public void setPartitionConstraint(PartitionConstraint partitionConstraint);
 
     /**
-     * Returns the final partition locations selected for scheduling. These are decided by Hyracks such that they satisfy the partition constraints.
-     * 
-     * @return array indicating number and node ids of the nodes to schedule the operator runtimes.
-     */
-    public String[] getPartitions();
-
-    /**
-     * Sets the partition locations.
-     * 
-     * @param partitions
-     *            node ids to schedule the operators.
-     */
-    public void setPartitions(String[] partitions);
-
-    /**
      * Gets the output record descriptor
      * 
      * @return Array of RecordDescriptor, one per output.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 081c3f2..3d3652a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -23,8 +23,8 @@
     @Option(name = "-http-port", usage = "Sets the http port for the admin console")
     public int httpPort;
 
-    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in seconds (default: 10)")
-    public int heartbeatPeriod = 10;
+    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+    public int heartbeatPeriod = 10000;
 
     @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
     public int maxHeartbeatLapsePeriods = 5;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 4bbd96a..af60240 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.hyracks.api.controller.INodeController;
 import edu.uci.ics.hyracks.api.controller.NodeParameters;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -85,17 +86,12 @@
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
-        if (ccConfig.useJOL) {
-            Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL
-                : new HashSet<DebugLevel>();
-            jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
-            jobManager = new JOLJobManagerImpl(this, jolRuntime);
-        } else {
-            jobManager = new JobManagerImpl(this);
-        }
+        Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+        jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+        jobManager = new JOLJobManagerImpl(this, jolRuntime);
         taskExecutor = Executors.newCachedThreadPool();
         webServer = new WebServer(new Handler[] {
-            getAdminConsoleHandler()
+            getAdminConsoleHandler(), getApplicationDataHandler()
         });
         this.timer = new Timer(true);
     }
@@ -107,7 +103,7 @@
         registry.rebind(IClusterController.class.getName(), this);
         webServer.setPort(ccConfig.httpPort);
         webServer.start();
-        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod * 1000);
+        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
@@ -139,6 +135,7 @@
             nodeRegistry.put(id, state);
         }
         nodeController.notifyRegistration(this);
+        jobManager.registerNode(id);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
         NodeParameters params = new NodeParameters();
         params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
@@ -184,6 +181,11 @@
     }
 
     @Override
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+        jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+    }
+
+    @Override
     public JobStatus getJobStatus(UUID jobId) throws Exception {
         return jobManager.getJobStatus(jobId);
     }
@@ -235,6 +237,17 @@
         return handler;
     }
 
+    private Handler getApplicationDataHandler() {
+        ContextHandler handler = new ContextHandler("/applications");
+        handler.setHandler(new AbstractHandler() {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request,
+                HttpServletResponse response) throws IOException, ServletException {
+            }
+        });
+        return handler;
+    }
+
     @Override
     public Map<String, INodeController> getRegistry() throws Exception {
         Map<String, INodeController> map = new HashMap<String, INodeController>();
@@ -273,6 +286,9 @@
                 }
                 for (String deadNode : deadNodes) {
                     try {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Killing node: " + deadNode);
+                        }
                         killNode(deadNode);
                     } catch (Exception e) {
                         e.printStackTrace();
@@ -298,7 +314,8 @@
         final Semaphore installComplete = new Semaphore(remoteOps.length);
         final List<Exception> errors = new Vector<Exception>();
         for (final RemoteOp<T> remoteOp : remoteOps) {
-            final INodeController node = lookupNode(remoteOp.getNodeId()).getNodeController();
+            NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+            final INodeController node = nodeState.getNodeController();
 
             installComplete.acquire();
             Runnable remoteRunner = new Runnable() {
@@ -334,21 +351,23 @@
         private JobPlan plan;
         private UUID stageId;
         private int attempt;
-        private Set<ActivityNodeId> tasks;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
 
         public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
-            Set<ActivityNodeId> tasks) {
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
             this.nodeId = nodeId;
             this.jobId = jobId;
             this.plan = plan;
             this.stageId = stageId;
             this.attempt = attempt;
             this.tasks = tasks;
+            this.opPartitions = opPartitions;
         }
 
         @Override
         public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
-            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks);
+            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
         }
 
         @Override
@@ -367,22 +386,25 @@
         private UUID jobId;
         private JobPlan plan;
         private UUID stageId;
-        private Set<ActivityNodeId> tasks;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
         private Map<PortInstanceId, Endpoint> globalPortMap;
 
-        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks,
+        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
             Map<PortInstanceId, Endpoint> globalPortMap) {
             this.nodeId = nodeId;
             this.jobId = jobId;
             this.plan = plan;
             this.stageId = stageId;
             this.tasks = tasks;
+            this.opPartitions = opPartitions;
             this.globalPortMap = globalPortMap;
         }
 
         @Override
         public Void execute(INodeController node) throws Exception {
-            node.initializeJobletPhase2(jobId, plan, stageId, tasks, globalPortMap);
+            node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
             return null;
         }
 
@@ -481,6 +503,32 @@
         }
     }
 
+    static class JobCompleteNotifier implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+
+        public JobCompleteNotifier(String nodeId, UUID jobId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.cleanUpJob(jobId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Cleaning Up";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
     static class PortMapMergingAccumulator implements
         Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
         Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@@ -495,4 +543,19 @@
             return portMap;
         }
     }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
index a13dc7e..e364423 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -28,14 +28,16 @@
 
     public void start(UUID jobId) throws Exception;
 
-    public void advanceJob(JobControl jobControlImpl) throws Exception;
-
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
         StageletStatistics statistics) throws Exception;
 
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
     public JobStatus getJobStatus(UUID jobId);
 
     public JobStatistics waitForCompletion(UUID jobId) throws Exception;
 
     public void notifyNodeFailure(String nodeId) throws Exception;
+
+    public void registerNode(String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
deleted file mode 100644
index e9bffba..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.job.JobStage;
-
-public interface IJobPlanner {
-    public Set<String> plan(JobControl jobControl, JobStage stage) throws Exception;
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 8917959..4041850 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,7 @@
 import jol.types.table.TableName;
 import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
@@ -74,10 +76,18 @@
 
     private final JobStartTable jobStartTable;
 
+    private final JobCleanUpTable jobCleanUpTable;
+
+    private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
     private final StartMessageTable startMessageTable;
 
     private final StageletCompleteTable stageletCompleteTable;
 
+    private final StageletFailureTable stageletFailureTable;
+
+    private final AvailableNodesTable availableNodesTable;
+
     private final FailedNodesTable failedNodesTable;
 
     private final AbortMessageTable abortMessageTable;
@@ -94,8 +104,12 @@
         this.acTable = new ActivityConnectionTable(jolRuntime);
         this.abTable = new ActivityBlockedTable(jolRuntime);
         this.jobStartTable = new JobStartTable();
+        this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+        this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
         this.startMessageTable = new StartMessageTable(jolRuntime);
         this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+        this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+        this.availableNodesTable = new AvailableNodesTable(jolRuntime);
         this.failedNodesTable = new FailedNodesTable(jolRuntime);
         this.abortMessageTable = new AbortMessageTable(jolRuntime);
         this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
@@ -108,8 +122,12 @@
         jolRuntime.catalog().register(acTable);
         jolRuntime.catalog().register(abTable);
         jolRuntime.catalog().register(jobStartTable);
+        jolRuntime.catalog().register(jobCleanUpTable);
+        jolRuntime.catalog().register(jobCleanUpCompleteTable);
         jolRuntime.catalog().register(startMessageTable);
         jolRuntime.catalog().register(stageletCompleteTable);
+        jolRuntime.catalog().register(stageletFailureTable);
+        jolRuntime.catalog().register(availableNodesTable);
         jolRuntime.catalog().register(failedNodesTable);
         jolRuntime.catalog().register(abortMessageTable);
         jolRuntime.catalog().register(abortNotifyTable);
@@ -144,13 +162,40 @@
                             Integer attempt = (Integer) data[2];
                             JobPlan plan = (JobPlan) data[3];
                             Set<List> ts = (Set<List>) data[4];
+                            Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+                            for (List t2 : ts) {
+                                Object[] t2Data = t2.toArray();
+                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                for (List l : activityInfoSet) {
+                                    Object[] lData = l.toArray();
+                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                    Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+                                    if (opParts == null) {
+                                        opParts = new HashSet<Integer>();
+                                        opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+                                    }
+                                    opParts.add((Integer) lData[1]);
+                                }
+                            }
                             ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
                                 .size()];
                             int i = 0;
                             for (List t2 : ts) {
                                 Object[] t2Data = t2.toArray();
+                                Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                for (List l : activityInfoSet) {
+                                    Object[] lData = l.toArray();
+                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                    Set<Integer> aParts = tasks.get(aid);
+                                    if (aParts == null) {
+                                        aParts = new HashSet<Integer>();
+                                        tasks.put(aid, aParts);
+                                    }
+                                    aParts.add((Integer) lData[1]);
+                                }
                                 p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
-                                    plan, stageId, attempt, (Set<ActivityNodeId>) t2Data[1]);
+                                    plan, stageId, attempt, tasks, opPartitions);
                             }
                             Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
                                 new ClusterControllerService.PortMapMergingAccumulator());
@@ -163,8 +208,20 @@
                             i = 0;
                             for (List t2 : ts) {
                                 Object[] t2Data = t2.toArray();
+                                Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                for (List l : activityInfoSet) {
+                                    Object[] lData = l.toArray();
+                                    ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                    Set<Integer> aParts = tasks.get(aid);
+                                    if (aParts == null) {
+                                        aParts = new HashSet<Integer>();
+                                        tasks.put(aid, aParts);
+                                    }
+                                    aParts.add((Integer) lData[1]);
+                                }
                                 p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
-                                    stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+                                    stageId, tasks, opPartitions, globalPortMap);
                                 p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
                                     stageId);
                                 ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
@@ -181,6 +238,37 @@
             }
         });
 
+        jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                try {
+                    synchronized (JOLJobManagerImpl.this) {
+                        for (Tuple t : tuples) {
+                            Object[] data = t.toArray();
+                            UUID jobId = (UUID) data[0];
+                            Set<String> ts = (Set<String>) data[1];
+                            ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+                                .size()];
+                            int i = 0;
+                            for (String n : ts) {
+                                jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+                            }
+                            ccs.runRemote(jcns, null);
+                            BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
+                            jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
         abortMessageTable.register(new AbortMessageTable.Callback() {
             @Override
             public void deletion(TupleSet tuples) {
@@ -224,11 +312,6 @@
     }
 
     @Override
-    public void advanceJob(JobControl jobControlImpl) throws Exception {
-
-    }
-
-    @Override
     public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
         final UUID jobId = UUID.randomUUID();
 
@@ -270,16 +353,13 @@
         BasicTupleSet olTuples = new BasicTupleSet();
         for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
             IOperatorDescriptor od = e.getValue();
-            odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
             PartitionConstraint pc = od.getPartitionConstraint();
             LocationConstraint[] locationConstraints = pc.getLocationConstraints();
-            String[] partitions = new String[locationConstraints.length];
+            int nPartitions = locationConstraints.length;
+            odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
             for (int i = 0; i < locationConstraints.length; ++i) {
-                String nodeId = ((AbsoluteLocationConstraint) locationConstraints[i]).getLocationId();
-                olTuples.add(OperatorLocationTable.createTuple(jobId, od.getOperatorId(), nodeId));
-                partitions[i] = nodeId;
+                addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i]);
             }
-            od.setPartitions(partitions);
             od.contributeTaskGraph(gBuilder);
         }
 
@@ -303,6 +383,21 @@
         return jobId;
     }
 
+    private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+        LocationConstraint locationConstraint) {
+        switch (locationConstraint.getConstraintType()) {
+            case ABSOLUTE:
+                String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+                olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i));
+                break;
+
+            case CHOICE:
+                for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+                    addLocationConstraintTuple(olTuples, jobId, opId, i, lc);
+                }
+        }
+    }
+
     @Override
     public JobStatus getJobStatus(UUID jobId) {
         synchronized (jobTable) {
@@ -320,6 +415,12 @@
 
     @Override
     public void notifyNodeFailure(String nodeId) throws Exception {
+        BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+        jolRuntime.evaluate();
+
         BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
 
         jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
@@ -339,6 +440,17 @@
     }
 
     @Override
+    public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+        throws Exception {
+        BasicTupleSet sfTuples = new BasicTupleSet();
+        sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
     public synchronized void start(UUID jobId) throws Exception {
         BasicTupleSet jsTuples = new BasicTupleSet();
         jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
@@ -349,6 +461,15 @@
     }
 
     @Override
+    public void registerNode(String nodeId) throws Exception {
+        BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
     public JobStatistics waitForCompletion(UUID jobId) throws Exception {
         synchronized (jobTable) {
             Tuple jobTuple = null;
@@ -416,20 +537,20 @@
 
         @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
+            UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
         };
 
         public OperatorDescriptorTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
-        static Tuple createTuple(UUID jobId, IOperatorDescriptor od) {
-            return new Tuple(jobId, od.getOperatorId(), od);
+        static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+            return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
         }
     }
 
     /*
-     * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+     * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
      */
     private static class OperatorLocationTable extends BasicTable {
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
@@ -438,15 +559,15 @@
 
         @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, String.class
+            UUID.class, OperatorDescriptorId.class, String.class, Integer.class
         };
 
         public OperatorLocationTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
-        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId) {
-            return new Tuple(jobId, opId, nodeId);
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition) {
+            return new Tuple(jobId, opId, nodeId, partition);
         }
     }
 
@@ -599,6 +720,44 @@
     }
 
     /*
+     * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+     */
+    private static class JobCleanUpTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, Set.class
+        };
+
+        public JobCleanUpTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(jobcleanupcomplete, keys(0), {JobId})
+     */
+    private static class JobCleanUpCompleteTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class
+        };
+
+        public JobCleanUpCompleteTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId) {
+            return new Tuple(jobId);
+        }
+    }
+
+    /*
      * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
      */
     private static class StageletCompleteTable extends BasicTable {
@@ -622,6 +781,50 @@
     }
 
     /*
+     * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+     */
+    private static class StageletFailureTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, Integer.class
+        };
+
+        public StageletFailureTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
+        }
+    }
+
+    /*
+     * declare(availablenodes, keys(0), {NodeId})
+     */
+    private static class AvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class
+        };
+
+        public AvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
      * declare(failednodes, keys(0), {NodeId})
      */
     private static class FailedNodesTable extends BasicTable {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
deleted file mode 100644
index a7e2fc1..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.rmi.RemoteException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobControl {
-    private static final long serialVersionUID = 1L;
-
-    private final IJobManager jobManager;
-
-    private final JobPlan jobPlan;
-
-    private final UUID jobId;
-
-    private final Map<UUID, StageProgress> stageProgressMap;
-
-    private final Set<UUID> completeStages;
-
-    private JobStatus jobStatus;
-
-    private JobStatistics jobStatistics;
-
-    public JobControl(IJobManager jobManager, JobPlan jobPlan) throws RemoteException {
-        this.jobManager = jobManager;
-        this.jobPlan = jobPlan;
-        jobId = UUID.randomUUID();
-        stageProgressMap = new HashMap<UUID, StageProgress>();
-        completeStages = new HashSet<UUID>();
-        jobStatus = JobStatus.INITIALIZED;
-        jobStatistics = new JobStatistics();
-    }
-
-    public JobPlan getJobPlan() {
-        return jobPlan;
-    }
-
-    public UUID getJobId() {
-        return jobId;
-    }
-
-    public synchronized JobStatus getJobStatus() {
-        return jobStatus;
-    }
-
-    public Set<UUID> getCompletedStages() {
-        return completeStages;
-    }
-
-    public void setStatus(JobStatus status) {
-        jobStatus = status;
-    }
-
-    public StageProgress getStageProgress(int stageId) {
-        return stageProgressMap.get(stageId);
-    }
-
-    public void setStageProgress(UUID stageId, StageProgress stageProgress) {
-        stageProgressMap.put(stageId, stageProgress);
-    }
-
-    public synchronized void notifyStageletComplete(UUID stageId, String nodeId, StageletStatistics ss)
-            throws Exception {
-        StageProgress stageProgress = stageProgressMap.get(stageId);
-        stageProgress.markNodeComplete(nodeId);
-        StageStatistics stageStatistics = stageProgress.getStageStatistics();
-        stageStatistics.addStageletStatistics(ss);
-        if (stageProgress.stageComplete()) {
-            jobStatistics.addStageStatistics(stageStatistics);
-            stageProgressMap.remove(stageId);
-            completeStages.add(stageId);
-            jobManager.advanceJob(this);
-        }
-    }
-
-    public synchronized JobStatistics waitForCompletion() throws Exception {
-        while (jobStatus != JobStatus.TERMINATED) {
-            wait();
-        }
-        return jobStatistics;
-    }
-
-    public synchronized void notifyJobComplete() {
-        jobStatus = JobStatus.TERMINATED;
-        notifyAll();
-    }
-
-    public JobStatistics getJobStatistics() {
-        return jobStatistics;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
deleted file mode 100644
index fc902b2..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobManagerImpl implements IJobManager {
-    private static final Logger LOGGER = Logger.getLogger(JobManagerImpl.class.getName());
-    private ClusterControllerService ccs;
-
-    private final Map<UUID, JobControl> jobMap;
-
-    private final IJobPlanner planner;
-
-    public JobManagerImpl(ClusterControllerService ccs) {
-        this.ccs = ccs;
-        jobMap = new HashMap<UUID, JobControl>();
-        planner = new NaiveJobPlannerImpl();
-    }
-
-    public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobPlanner planner = new JobPlanner();
-        JobControl jc = new JobControl(this, planner.plan(jobSpec, jobFlags));
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(jc.getJobPlan().toString());
-        }
-        jobMap.put(jc.getJobId(), jc);
-
-        return jc.getJobId();
-    }
-
-    public synchronized void start(UUID jobId) throws Exception {
-        JobControl jobControlImpl = jobMap.get(jobId);
-        LOGGER
-            .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: " + jobControlImpl.getJobStatus());
-        if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
-            return;
-        }
-        jobControlImpl.getJobStatistics().setStartTime(new Date());
-        jobControlImpl.setStatus(JobStatus.RUNNING);
-        schedule(jobControlImpl);
-    }
-
-    public synchronized void advanceJob(JobControl jobControlImpl) throws Exception {
-        schedule(jobControlImpl);
-    }
-
-    private void schedule(JobControl jobControlImpl) throws Exception {
-        JobPlan plan = jobControlImpl.getJobPlan();
-        JobStage endStage = plan.getEndStage();
-
-        Set<UUID> completedStages = jobControlImpl.getCompletedStages();
-        List<JobStage> runnableStages = new ArrayList<JobStage>();
-        findRunnableStages(endStage, runnableStages, completedStages, new HashSet<UUID>());
-        if (runnableStages.size() == 1 && runnableStages.get(0).getTasks().isEmpty()) {
-            LOGGER.info("Job " + jobControlImpl.getJobId() + " complete");
-            jobControlImpl.getJobStatistics().setEndTime(new Date());
-            cleanUp(jobControlImpl);
-            jobControlImpl.notifyJobComplete();
-        } else {
-            for (JobStage s : runnableStages) {
-                if (s.isStarted()) {
-                    continue;
-                }
-                startStage(jobControlImpl, s);
-            }
-        }
-    }
-
-    private void cleanUp(JobControl jc) {
-        jobMap.remove(jc.getJobId());
-        ccs.notifyJobComplete(jc.getJobId());
-    }
-
-    private void startStage(JobControl jc, JobStage stage) throws Exception {
-        stage.setStarted();
-        Set<String> candidateNodes = deploy(jc, stage);
-        for (String nodeId : candidateNodes) {
-            ccs.lookupNode(nodeId).getNodeController().startStage(jc.getJobId(), stage.getId());
-        }
-    }
-
-    private void findRunnableStages(JobStage s, List<JobStage> runnableStages, Set<UUID> completedStages, Set<UUID> seen) {
-        boolean runnable = true;
-        if (seen.contains(s.getId())) {
-            return;
-        }
-        seen.add(s.getId());
-        for (JobStage dep : s.getDependencies()) {
-            boolean depComplete = completedStages.contains(dep.getId());
-            runnable = runnable && depComplete;
-            if (!depComplete) {
-                findRunnableStages(dep, runnableStages, completedStages, seen);
-            }
-        }
-        if (runnable) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Runnable stage: " + s);
-            }
-            runnableStages.add(s);
-        }
-    }
-
-    private Set<String> deploy(JobControl jc, JobStage stage) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Deploying: " + stage);
-        }
-        UUID jobId = jc.getJobId();
-        JobPlan plan = jc.getJobPlan();
-        UUID stageId = stage.getId();
-        Set<String> participatingNodes = plan(jc, stage);
-        StageProgress stageProgress = new StageProgress(stage.getId());
-        stageProgress.addPendingNodes(participatingNodes);
-        ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[participatingNodes
-            .size()];
-        ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[participatingNodes
-            .size()];
-        ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[participatingNodes
-            .size()];
-        int i = 0;
-        for (String nodeId : participatingNodes) {
-            p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, 0, stage.getTasks());
-        }
-        Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
-            new ClusterControllerService.PortMapMergingAccumulator());
-        i = 0;
-        for (String nodeId : participatingNodes) {
-            p2is[i++] = new ClusterControllerService.Phase2Installer(nodeId, jobId, plan, stageId, stage.getTasks(),
-                globalPortMap);
-        }
-        ccs.runRemote(p2is, null);
-        i = 0;
-        for (String nodeId : participatingNodes) {
-            p3is[i++] = new ClusterControllerService.Phase3Installer(nodeId, jobId, stageId);
-        }
-        ccs.runRemote(p3is, null);
-        jc.setStageProgress(stage.getId(), stageProgress);
-        return participatingNodes;
-    }
-
-    private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
-        Set<String> participatingNodes = planner.plan(jc, stage);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(stage + " Participating nodes: " + participatingNodes);
-        }
-        return participatingNodes;
-    }
-
-    @Override
-    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-        StageletStatistics statistics) throws Exception {
-        JobControl jc = jobMap.get(jobId);
-        if (jc != null) {
-            jc.notifyStageletComplete(stageId, nodeId, statistics);
-        }
-    }
-
-    @Override
-    public synchronized JobStatus getJobStatus(UUID jobId) {
-        JobControl jc = jobMap.get(jobId);
-        return jc.getJobStatus();
-    }
-
-    @Override
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        JobControl jc;
-        synchronized (this) {
-            jc = jobMap.get(jobId);
-        }
-        if (jc != null) {
-            return jc.waitForCompletion();
-        }
-        return null;
-    }
-
-    @Override
-    public synchronized void notifyNodeFailure(String nodeId) {
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
deleted file mode 100644
index f3ccfee..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-
-public class NaiveJobPlannerImpl implements IJobPlanner {
-    @Override
-    public Set<String> plan(JobControl jc, JobStage stage) throws Exception {
-        final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
-        for (ActivityNodeId t : stage.getTasks()) {
-            opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
-        }
-
-        final Set<String> candidateNodes = new HashSet<String>();
-
-        IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) throws Exception {
-                if (!opSet.contains(op.getOperatorId())) {
-                    return;
-                }
-                String[] partitions = op.getPartitions();
-                if (partitions == null) {
-                    PartitionConstraint pc = op.getPartitionConstraint();
-                    LocationConstraint[] lcs = pc.getLocationConstraints();
-                    String[] assignment = new String[lcs.length];
-                    for (int i = 0; i < lcs.length; ++i) {
-                        String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
-                        assignment[i] = nodeId;
-                    }
-                    op.setPartitions(assignment);
-                    partitions = assignment;
-                }
-                for (String p : partitions) {
-                    candidateNodes.add(p);
-                }
-            }
-        };
-
-        PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
-        return candidateNodes;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 2269e1f..7f0a44e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -91,4 +91,13 @@
         stageletMap.remove(stageId);
         nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
     }
+
+    public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+        stageletMap.remove(stageId);
+        nodeController.notifyStageFailed(jobId, stageId, attempt);        
+    }
+
+    public NodeControllerService getNodeController() {
+        return nodeController;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 7b595b7..c202b3d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -49,6 +49,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -111,7 +112,7 @@
         this.nodeParameters = cc.registerNode(this);
 
         // Schedule heartbeat generator.
-        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod() * 1000);
+        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
 
         LOGGER.log(Level.INFO, "Started NodeControllerService");
     }
@@ -124,7 +125,7 @@
     }
 
     @Override
-    public String getId() throws Exception {
+    public String getId() {
         return id;
     }
 
@@ -162,7 +163,7 @@
 
     @Override
     public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
-        Set<ActivityNodeId> activities) throws Exception {
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
 
         final Joblet joblet = getLocalJoblet(jobId);
@@ -175,43 +176,43 @@
 
         List<Endpoint> endpointList = new ArrayList<Endpoint>();
 
-        for (ActivityNodeId hanId : activities) {
+        for (ActivityNodeId hanId : tasks.keySet()) {
             IActivityNode han = plan.getActivityNodeMap().get(hanId);
             if (LOGGER.isLoggable(Level.FINEST)) {
                 LOGGER.finest("Initializing " + hanId + " -> " + han);
             }
             IOperatorDescriptor op = han.getOwner();
             List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
-            String[] partitions = op.getPartitions();
-            for (int i = 0; i < partitions.length; ++i) {
-                String part = partitions[i];
-                if (id.equals(part)) {
-                    IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
-                    OperatorRunnable or = new OperatorRunnable(ctx, hon);
-                    stagelet.setOperator(op.getOperatorId(), i, or);
-                    if (inputs != null) {
-                        for (int j = 0; j < inputs.size(); ++j) {
-                            if (j >= 1) {
-                                throw new IllegalStateException();
-                            }
-                            IConnectorDescriptor conn = inputs.get(j);
-                            Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
-                            endpointList.add(endpoint);
-                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
-                                stageId);
-                            connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
-                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
-                                .getTaskInputMap().get(hanId).get(j), i);
-                            if (LOGGER.isLoggable(Level.FINEST)) {
-                                LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
-                            }
-                            portMap.put(piId, endpoint);
-                            IFrameReader reader = createReader(conn, drlf, i, plan, stagelet);
-                            or.setFrameReader(reader);
+            for (int i : tasks.get(hanId)) {
+                IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+                OperatorRunnable or = new OperatorRunnable(ctx, hon);
+                stagelet.setOperator(op.getOperatorId(), i, or);
+                if (inputs != null) {
+                    for (int j = 0; j < inputs.size(); ++j) {
+                        if (j >= 1) {
+                            throw new IllegalStateException();
                         }
+                        IConnectorDescriptor conn = inputs.get(j);
+                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                            .getOperatorId();
+                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                            .getOperatorId();
+                        Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+                        endpointList.add(endpoint);
+                        DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
+                        connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+                        PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+                            .getTaskInputMap().get(hanId).get(j), i);
+                        if (LOGGER.isLoggable(Level.FINEST)) {
+                            LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+                        }
+                        portMap.put(piId, endpoint);
+                        IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
+                            .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+                        or.setFrameReader(reader);
                     }
-                    honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
                 }
+                honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
             }
         }
 
@@ -221,8 +222,10 @@
     }
 
     private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
-        final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
-        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
+        final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+        throws HyracksDataException {
+        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+            nConsumerCount);
 
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
             private int frameCount;
@@ -253,7 +256,8 @@
     }
 
     @Override
-    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
         final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
         final Joblet ji = getLocalJoblet(jobId);
@@ -264,37 +268,38 @@
 
         final JobSpecification spec = plan.getJobSpecification();
 
-        for (ActivityNodeId hanId : activities) {
+        for (ActivityNodeId hanId : tasks.keySet()) {
             IActivityNode han = plan.getActivityNodeMap().get(hanId);
             IOperatorDescriptor op = han.getOwner();
             List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
-            String[] partitions = op.getPartitions();
-            for (int i = 0; i < partitions.length; ++i) {
-                String part = partitions[i];
-                if (id.equals(part)) {
-                    OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
-                    if (outputs != null) {
-                        for (int j = 0; j < outputs.size(); ++j) {
-                            final IConnectorDescriptor conn = outputs.get(j);
-                            final int senderIndex = i;
-                            IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
-                                @Override
-                                public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
-                                    PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                        Direction.INPUT, spec.getConsumerInputIndex(conn), index);
-                                    Endpoint ep = globalPortMap.get(piId);
-                                    if (LOGGER.isLoggable(Level.FINEST)) {
-                                        LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
-                                    }
-                                    return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
-                                        .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+            for (int i : tasks.get(hanId)) {
+                OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+                if (outputs != null) {
+                    for (int j = 0; j < outputs.size(); ++j) {
+                        final IConnectorDescriptor conn = outputs.get(j);
+                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                            .getOperatorId();
+                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                            .getOperatorId();
+                        final int senderIndex = i;
+                        IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+                            @Override
+                            public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+                                PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+                                    Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                Endpoint ep = globalPortMap.get(piId);
+                                if (LOGGER.isLoggable(Level.FINEST)) {
+                                    LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
                                 }
-                            };
-                            or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
-                        }
+                                return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
+                                    .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                            }
+                        };
+                        or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
+                            producerOpId).size(), opPartitions.get(consumerOpId).size()));
                     }
-                    stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
                 }
+                stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
             }
         }
     }
@@ -373,6 +378,10 @@
         ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
     }
 
+    public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+        ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+    }
+
     @Override
     public void notifyRegistration(IClusterController ccs) throws Exception {
         this.ccs = ccs;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 8c0b830..73667ed 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -117,11 +117,13 @@
                         + opIId.getOperatorId() + ":" + opIId.getPartition());
                 } catch (Exception e) {
                     e.printStackTrace();
+                    notifyOperatorFailure(opIId);
                 }
                 try {
                     hon.run();
-                } finally {
                     notifyOperatorCompletion(opIId);
+                } catch (Exception e) {
+                    notifyOperatorFailure(opIId);
                 }
             }
         });
@@ -139,6 +141,15 @@
         }
     }
 
+    protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+        abort();
+        try {
+            joblet.notifyStageletFailed(stageId, attempt);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     private synchronized void waitUntilStarted() throws InterruptedException {
         while (!started && !abort) {
             wait();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 937e0a4..5013812 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -37,17 +37,16 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
-                .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 9f36730..28cd1fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -36,7 +36,7 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
 
     public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+        int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
@@ -45,19 +45,18 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
-                .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux, int index)
-            throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for(int i = 0; i < comparatorFactories.length; ++i) {
+        for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         JobSpecification spec = plan.getJobSpecification();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index f455f01..d3a8af9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -41,7 +41,7 @@
         private final FrameTupleAccessor tupleAccessor;
 
         public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
-                FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
+            FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
             this.consumerPartitionCount = consumerPartitionCount;
             this.epWriters = epWriters;
             this.appenders = appenders;
@@ -106,12 +106,11 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
-        final FrameTupleAppender[] appenders = new FrameTupleAppender[consumerPartitionCount];
-        for (int i = 0; i < consumerPartitionCount; ++i) {
+        final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
+        for (int i = 0; i < nConsumerPartitions; ++i) {
             try {
                 epWriters[i] = edwFactory.createFrameWriter(i);
                 appenders[i] = new FrameTupleAppender(ctx);
@@ -120,14 +119,14 @@
                 throw new HyracksDataException(e);
             }
         }
-        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, consumerPartitionCount, epWriters, appenders, spec
-                .getConnectorRecordDescriptor(this));
+        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
+            .getConnectorRecordDescriptor(this));
         return rangeWriter;
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 50d804a..324c39f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -36,11 +36,9 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
-        JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
-        for (int i = 0; i < consumerPartitionCount; ++i) {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        for (int i = 0; i < nConsumerPartitions; ++i) {
             epWriters[i] = edwFactory.createFrameWriter(i);
         }
         return new IFrameWriter() {
@@ -73,7 +71,7 @@
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index c46026a..fff3aa6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -34,13 +34,13 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
index b411020..2e610a9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
@@ -74,16 +74,6 @@
     }
 
     @Override
-    public String[] getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public void setPartitions(String[] partitions) {
-        this.partitions = partitions;
-    }
-
-    @Override
     public RecordDescriptor[] getOutputRecordDescriptors() {
         return recordDescriptors;
     }
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index ba7d0f7..b120ba88 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -76,54 +76,93 @@
 
 watch(jobstage, a);
 
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+    jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+    jobattempt(JobId, Attempt),
+    stagestart(JobId, _, Attempt),
+    abortcomplete(JobId, _, Attempt)
+    {
+        NextAttempt := Attempt + 1;
+    };
+
 define(stagestart, keys(), {UUID, Integer, Integer});
-define(stagefinish, keys(0, 1), {UUID, Integer, Set});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
 
 watch(jobstart, i);
 
-stagestart_INITIAL stagestart(JobId, 0, 0) :-
-    jobstart(JobId, _),
-    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
-    notin stagestart(JobId, _, _);
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+    jobattempt#insert(JobId, Attempt);
 
 update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
     job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
     jobstart(JobId, _);
 
-stagestart_NEXT stagestart(JobId, NextStageNumber, 0) :-
-    stagestart(JobId, StageNumber, _),
-    stagefinish#insert(StageId, StageNumber, _)
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+    stagestart(JobId, StageNumber, Attempt),
+    stagefinish#insert(StageId, StageNumber, Attempt, _)
     {
         NextStageNumber := StageNumber + 1;
     };
 
-stagestart_AGAIN stagestart(JobId, StageNumber, NextAttempt) :-
-    stagestart(JobId, StageNumber, Attempt),
-    abortcomplete(JobId, StageId, Attempt),
-    jobstage(JobId, StageNumber, StageId)
-    {
-        NextAttempt := Attempt + 1;
-    };
-
 watch(stagestart, a);
 watch(stagestart, d);
 
-define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String});
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, String});
 
-activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId) :-
-    stagestart(JobId, StageNumber, Attempt),
-    operatordescriptor(JobId, OperatorId, _),
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+    operatorlocation(JobId, OperatorId, NodeId, Partition),
+    availablenodes(NodeId)
+    {
+        Benefit := NodeId;
+    };
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, String});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+    operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(operatorlocationdecision, keys(0, 1, 3), {UUID, OperatorDescriptorId, String, Integer});
+
+watch(operatorlocationdecision, a);
+watch(operatorlocationdecision, i);
+watch(operatorlocationdecision, d);
+
+operatorlocationdecision(JobId, OperatorId, NodeId, Partition) :-
+    operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+    maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    operatordescriptor(JobId, OperatorId, _, _),
     activitystage(JobId, OperatorId, ActivityId, StageNumber),
     jobstage(JobId, StageNumber, StageId),
-    operatorlocation(JobId, OperatorId, NodeId);
+    operatorlocationdecision(JobId, OperatorId, NodeId, Partition);
 
 watch(activitystart, a);
 
 define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
 
-stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityId>) :-
-    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId),
-    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+    {
+        ActivityInfo := [ActivityId, Partition];
+    };
 
 watch(stageletstart, a);
 watch(stageletstart, i);
@@ -131,9 +170,11 @@
 define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
 
 startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
-    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet)
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+    availablenodes(NodeId),
+    ActivityInfoSet.size() != 0
     {
-        Tuple := [NodeId, ActivityIdSet];
+        Tuple := [NodeId, ActivityInfoSet];
     };
 
 startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
@@ -145,18 +186,29 @@
 define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
 
 stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
-    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
-    failednodes(NodeId);
+    stageletfailure(JobId, StageId, NodeId, Attempt),
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
 
-stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
-    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
-    stageletabort(JobId, StageId, _, _, Attempt, _);
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+    stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+    failednodes(NodeId),
+    notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabort(JobId, StageId, _, NodeId, Attempt, _);
 
 define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
 
 abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
     stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
-    notin failednodes(NodeId)
+    availablenodes(NodeId)
     {
         Tuple := [NodeId, ActivityIdSet];
     };
@@ -167,24 +219,33 @@
 watch(abortmessage, a);
 watch(abortmessage, i);
 
-define(abortnotify_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
 
-abortnotify_agg(JobId, StageId, Attempt, set<NodeId>) :-
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
     abortnotify(JobId, StageId, NodeId, Attempt);
 
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+    failednodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
 define(abortcomplete, keys(), {UUID, UUID, Integer});
 
 abortcomplete(JobId, StageId, Attempt) :-
-    abortnotify_agg(JobId, StageId, Attempt, NodeIdSet),
-    abortmessage_agg(JobId, StageId, Attempt, _, TSet),
-    TSet.size() == NodeIdSet.size();
+    stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+    stageabort(JobId, StageId, Attempt, NodeIdSet2),
+    NodeIdSet1.size() == NodeIdSet2.size();
 
 define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
 
 stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
     stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
 
-stagefinish(JobId, StageNumber, SSet) :-
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
     startmessage_agg(JobId, StageId, Attempt, _, TSet),
     stageletcomplete_agg(JobId, StageId, Attempt, SSet),
     jobstage(JobId, StageNumber, StageId),
@@ -192,6 +253,17 @@
 
 update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
     job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
-    stagestart#insert(JobId, StageNumber),
-    stagefinish(JobId, _, SSet),
-    notin jobstage(JobId, StageNumber);
\ No newline at end of file
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, SSet),
+    notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, _),
+    operatorlocationdecision(JobId, _, NodeId, Attempt),
+    notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+    jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file