Added state awareness of partitions to CC

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@458 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 56002a8..7de7ab8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -17,7 +17,6 @@
 import java.io.File;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -73,6 +72,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
@@ -296,13 +296,13 @@
     }
 
     @Override
-    public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception {
-        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId));
+    public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception {
+        jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId, state));
     }
 
     @Override
-    public void registerPartitionRequest(Collection<PartitionId> requiredPartitionIds, String nodeId) {
-        jobQueue.schedule(new RegisterPartitionRequestEvent(this, requiredPartitionIds, nodeId));
+    public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) {
+        jobQueue.schedule(new RegisterPartitionRequestEvent(this, pid, nodeId, minState));
     }
 
     private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 6c851cc..acf6750 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
 public class JobRun implements IJobStatusConditionVariable {
@@ -33,9 +34,9 @@
 
     private final JobActivityGraph jag;
 
-    private final Map<PartitionId, Set<String>> partitionAvailabilityMap;
+    private final Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap;
 
-    private final Map<PartitionId, Set<String>> partitionRequestorMap;
+    private final Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap;
 
     private final Set<String> participatingNodeIds;
 
@@ -52,8 +53,8 @@
     public JobRun(UUID jobId, JobActivityGraph plan) {
         this.jobId = jobId;
         this.jag = plan;
-        partitionAvailabilityMap = new HashMap<PartitionId, Set<String>>();
-        partitionRequestorMap = new HashMap<PartitionId, Set<String>>();
+        partitionAvailabilityMap = new HashMap<PartitionId, Map<String, PartitionState>>();
+        partitionRequestorMap = new HashMap<PartitionId, Map<String, PartitionState>>();
         participatingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
@@ -107,11 +108,11 @@
         return jsm;
     }
 
-    public Map<PartitionId, Set<String>> getPartitionAvailabilityMap() {
+    public Map<PartitionId, Map<String, PartitionState>> getPartitionAvailabilityMap() {
         return partitionAvailabilityMap;
     }
 
-    public Map<PartitionId, Set<String>> getPartitionRequestorMap() {
+    public Map<PartitionId, Map<String, PartitionState>> getPartitionRequestorMap() {
         return partitionRequestorMap;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 1bcc13b..8a35cf1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -24,16 +24,20 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 
 public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
     private final PartitionId pid;
     private final String nodeId;
+    private final PartitionState state;
 
-    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId) {
+    public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
+            PartitionState state) {
         this.ccs = ccs;
         this.pid = pid;
         this.nodeId = nodeId;
+        this.state = state;
     }
 
     @Override
@@ -42,26 +46,32 @@
         if (run == null) {
             return;
         }
-        Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-        Set<String> paSet = partitionAvailabilityMap.get(pid);
-        if (paSet == null) {
-            paSet = new HashSet<String>();
-            partitionAvailabilityMap.put(pid, paSet);
+        Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+        Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
+        if (paMap == null) {
+            paMap = new HashMap<String, PartitionState>();
+            partitionAvailabilityMap.put(pid, paMap);
         }
-        paSet.add(nodeId);
+        paMap.put(nodeId, state);
 
         NodeControllerState availNcs = ccs.getNodeMap().get(nodeId);
-        Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
-        Set<String> prSet = partitionRequestorMap.get(pid);
-        if (prSet != null) {
-            for (String requestor : prSet) {
-                NodeControllerState ncs = ccs.getNodeMap().get(requestor);
-                if (ncs != null) {
-                    try {
-                        INodeController nc = ncs.getNodeController();
-                        nc.reportPartitionAvailability(pid, availNcs.getDataPort());
-                    } catch (Exception e) {
-                        e.printStackTrace();
+        Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
+        Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
+        if (prMap != null) {
+            Iterator<Map.Entry<String, PartitionState>> i = prMap.entrySet().iterator();
+            while (i.hasNext()) {
+                Map.Entry<String, PartitionState> requestor = i.next();
+                PartitionState minState = requestor.getValue();
+                if (state.isAtLeast(minState)) {
+                    i.remove();
+                    NodeControllerState ncs = ccs.getNodeMap().get(requestor.getKey());
+                    if (ncs != null) {
+                        try {
+                            INodeController nc = ncs.getNodeController();
+                            nc.reportPartitionAvailability(pid, availNcs.getDataPort());
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
                     }
                 }
             }
@@ -70,6 +80,6 @@
 
     @Override
     public String toString() {
-        return "PartitionAvailable@" + nodeId + "[" + pid + "]";
+        return "PartitionAvailable@" + nodeId + "[" + pid + "]" + state;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index e578f77..2f2adc0 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -14,10 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -26,62 +24,67 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 
 public class RegisterPartitionRequestEvent extends AbstractEvent {
     private final ClusterControllerService ccs;
-    private final Collection<PartitionId> requiredPartitionIds;
+    private final PartitionId pid;
     private final String nodeId;
+    private final PartitionState minState;
 
-    public RegisterPartitionRequestEvent(ClusterControllerService ccs, Collection<PartitionId> requiredPartitionIds,
-            String nodeId) {
+    public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
+            PartitionState minState) {
         this.ccs = ccs;
-        this.requiredPartitionIds = requiredPartitionIds;
+        this.pid = pid;
         this.nodeId = nodeId;
+        this.minState = minState;
     }
 
     @Override
     public void run() {
         NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
         if (ncs != null) {
-            for (PartitionId pid : requiredPartitionIds) {
-                JobRun run = ccs.getRunMap().get(pid.getJobId());
-                if (run == null) {
-                    return;
-                }
+            JobRun run = ccs.getRunMap().get(pid.getJobId());
+            if (run == null) {
+                return;
+            }
 
-                Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
-                Set<String> paSet = partitionAvailabilityMap.get(pid);
-                boolean matched = false;
-                if (paSet != null && !paSet.isEmpty()) {
-                    for (String availablePartitionLocation : paSet) {
-                        NodeControllerState availNcs = ccs.getNodeMap().get(availablePartitionLocation);
+            Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+            Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
+            boolean matched = false;
+            if (paMap != null && !paMap.isEmpty()) {
+                for (Map.Entry<String, PartitionState> pa : paMap.entrySet()) {
+                    PartitionState state = pa.getValue();
+                    if (state.isAtLeast(minState)) {
+                        NodeControllerState availNcs = ccs.getNodeMap().get(pa.getKey());
                         if (availNcs != null) {
                             NetworkAddress networkAddress = availNcs.getDataPort();
                             try {
                                 INodeController nc = ncs.getNodeController();
                                 nc.reportPartitionAvailability(pid, networkAddress);
                                 matched = true;
+                                break;
                             } catch (Exception e) {
                                 e.printStackTrace();
                             }
                         }
                     }
                 }
-                if (!matched) {
-                    Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
-                    Set<String> prSet = partitionRequestorMap.get(pid);
-                    if (prSet == null) {
-                        prSet = new HashSet<String>();
-                        partitionRequestorMap.put(pid, prSet);
-                    }
-                    prSet.add(nodeId);
+            }
+            if (!matched) {
+                Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
+                Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
+                if (prMap == null) {
+                    prMap = new HashMap<String, PartitionState>();
+                    partitionRequestorMap.put(pid, prMap);
                 }
+                prMap.put(nodeId, minState);
             }
         }
     }
 
     @Override
     public String toString() {
-        return "PartitionRequest@[" + nodeId + "][" + requiredPartitionIds + "]";
+        return "PartitionRequest@[" + nodeId + "][" + pid + "]" + minState;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 9791161..8dbb180 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.hyracks.control.common.base;
 
 import java.rmi.Remote;
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
@@ -23,6 +22,7 @@
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
@@ -40,7 +40,7 @@
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
-    public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception;
+    public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception;
 
-    public void registerPartitionRequest(Collection<PartitionId> requiredPartitionIds, String nodeId) throws Exception;
+    public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
new file mode 100644
index 0000000..412cbac
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.control.common.job;
+
+public enum PartitionState {
+    STARTED,
+    COMMITTED;
+
+    public boolean isAtLeast(PartitionState minState) {
+        switch (this) {
+            case COMMITTED:
+                return true;
+
+            case STARTED:
+                return minState == STARTED;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 30434b8..9c12cd9 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -212,12 +213,12 @@
         return counter;
     }
 
-    public synchronized void advertisePartitionRequest(Collection<PartitionId> requiredPartitionIds,
-            IPartitionCollector collector) throws Exception {
-        for (PartitionId pid : requiredPartitionIds) {
+    public synchronized void advertisePartitionRequest(Collection<PartitionId> pids, IPartitionCollector collector,
+            PartitionState minState) throws Exception {
+        for (PartitionId pid : pids) {
             partitionRequestMap.put(pid, collector);
+            nodeController.getClusterController().registerPartitionRequest(pid, nodeController.getId(), minState);
         }
-        nodeController.getClusterController().registerPartitionRequest(requiredPartitionIds, nodeController.getId());
     }
 
     public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d5ce485..20bf9da 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -206,7 +207,7 @@
         try {
             collector.open();
             try {
-                joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
+                joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index a74f099..353ba32 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -70,5 +70,6 @@
     public void close() throws HyracksDataException {
         ctx.getIOManager().close(handle);
         manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
+        manager.notifyPartitionCommit(pid);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index e5e23bb..b676e33 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -58,7 +59,15 @@
             pList.add(partition);
         }
         try {
-            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId());
+            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.STARTED);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public void notifyPartitionCommit(PartitionId pid) throws HyracksDataException {
+        try {
+            ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.COMMITTED);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index c667ef4..ec6e25d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -74,6 +74,7 @@
 
     @Override
     public void close() throws HyracksDataException {
+        manager.notifyPartitionCommit(pid);
         delegate.close();
     }
 }
\ No newline at end of file