Converted OperatorDescriptorId and ConnectorDescriptorId to hold ints instead of UUIDs

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@514 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index 86d8118..3133a7f 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -15,24 +15,23 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 import java.io.Serializable;
-import java.util.UUID;
 
 public final class ConnectorDescriptorId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private UUID id;
+    private int id;
 
-    public ConnectorDescriptorId(UUID id) {
+    public ConnectorDescriptorId(int id) {
         this.id = id;
     }
 
-    public UUID getId() {
+    public int getId() {
         return id;
     }
 
     @Override
     public int hashCode() {
-        return id.hashCode();
+        return id;
     }
 
     @Override
@@ -44,7 +43,7 @@
             return false;
         }
         ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
-        return id.equals(other.id);
+        return id == other.id;
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index bbf3cf5..b858736 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -15,24 +15,23 @@
 package edu.uci.ics.hyracks.api.dataflow;
 
 import java.io.Serializable;
-import java.util.UUID;
 
 public final class OperatorDescriptorId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final UUID id;
+    private final int id;
 
-    public OperatorDescriptorId(UUID id) {
+    public OperatorDescriptorId(int id) {
         this.id = id;
     }
 
-    public UUID getId() {
+    public int getId() {
         return id;
     }
 
     @Override
     public int hashCode() {
-        return id.hashCode();
+        return id;
     }
 
     @Override
@@ -43,7 +42,7 @@
         if (!(o instanceof OperatorDescriptorId)) {
             return false;
         }
-        return ((OperatorDescriptorId) o).id.equals(id);
+        return ((OperatorDescriptorId) o).id == id;
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 79d10ab..e3d28ea 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -58,6 +58,10 @@
 
     private int maxAttempts;
 
+    private transient int operatorIdCounter;
+
+    private transient int connectorIdCounter;
+
     public JobSpecification() {
         roots = new ArrayList<OperatorDescriptorId>();
         opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -67,6 +71,16 @@
         connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
         properties = new HashMap<String, Serializable>();
         userConstraints = new HashSet<Constraint>();
+        operatorIdCounter = 0;
+        connectorIdCounter = 0;
+    }
+
+    public OperatorDescriptorId createOperatorDescriptorId() {
+        return new OperatorDescriptorId(operatorIdCounter++);
+    }
+
+    public ConnectorDescriptorId createConnectorDescriptor() {
+        return new ConnectorDescriptorId(connectorIdCounter++);
     }
 
     public void addRoot(IOperatorDescriptor op) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 91626f7..ae95ca8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -15,12 +15,9 @@
 package edu.uci.ics.hyracks.control.cc.job;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 
 public class ActivityCluster {
     private final JobRun jobRun;
@@ -33,16 +30,11 @@
 
     private ActivityClusterPlan acp;
 
-    private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
-    private Set<TaskCluster> inProgressTaskClusters;
-
     public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
         this.jobRun = jobRun;
         this.activities = activities;
         dependencies = new HashSet<ActivityCluster>();
         dependents = new HashSet<ActivityCluster>();
-        inProgressTaskClusters = new HashSet<TaskCluster>();
     }
 
     public Set<ActivityId> getActivities() {
@@ -80,16 +72,4 @@
     public void setPlan(ActivityClusterPlan acp) {
         this.acp = acp;
     }
-
-    public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
-        this.connectorPolicies = connectorPolicies;
-    }
-
-    public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
-        return connectorPolicies;
-    }
-
-    public Set<TaskCluster> getInProgressTaskClusters() {
-        return inProgressTaskClusters;
-    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index e4353f8..387d3bd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -19,23 +19,28 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
 
 public class GetJobProfileJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final UUID jobId;
-    private final int attempt;
     private JSONObject profile;
 
-    public GetJobProfileJSONEvent(ClusterControllerService ccs, UUID jobId, int attempt) {
+    public GetJobProfileJSONEvent(ClusterControllerService ccs, UUID jobId) {
         this.ccs = ccs;
         this.jobId = jobId;
-        this.attempt = attempt;
     }
 
     @Override
     protected void doRun() throws Exception {
         profile = new JSONObject();
+        JobRun jobRun = ccs.getRunMap().get(jobId);
+        if (jobRun == null) {
+            profile = new JSONObject();
+            return;
+        }
+        profile = jobRun.getJobProfile().toJSON();
     }
 
     public JSONObject getProfile() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 5396e2c..1aa19b6 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -17,7 +17,6 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 28a28ad..dfb0e14 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -367,7 +367,7 @@
         tcAttempt.initializePendingTaskCounter();
         tcAttempts.add(tcAttempt);
         tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
-        tc.getActivityCluster().getInProgressTaskClusters().add(tc);
+        inProgressTaskClusters.add(tc);
     }
 
     private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index 716dfcb..b62b391 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -53,19 +53,12 @@
                     GetJobSpecificationJSONEvent gjse = new GetJobSpecificationJSONEvent(ccs, jobId);
                     ccs.getJobQueue().scheduleAndSync(gjse);
                     result.put("result", gjse.getSpecification());
-                }
-                break;
-            }
-
-            case 3: {
-                UUID jobId = UUID.fromString(arguments[0]);
-                int attempt = Integer.parseInt(arguments[1]);
-
-                if ("profile".equalsIgnoreCase(arguments[2])) {
-                    GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId, attempt);
+                } else if ("profile".equalsIgnoreCase(arguments[1])) {
+                    GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
                     ccs.getJobQueue().scheduleAndSync(gjpe);
                     result.put("result", gjpe.getProfile());
                 }
+
                 break;
             }
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index c3a4e20..15512ae 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -43,7 +43,7 @@
 public class ConnectionManager {
     private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
 
-    static final int INITIAL_MESSAGE_SIZE = 40;
+    static final int INITIAL_MESSAGE_SIZE = 28;
 
     private final IHyracksRootContext ctx;
 
@@ -123,7 +123,6 @@
         private final List<SocketChannel> pendingIncomingConnections;
         private final Set<SocketChannel> pendingNegotiations;
         private final List<INetworkChannel> pendingOutgoingConnections;
-        private final List<INetworkChannel> pendingAbortConnections;
 
         public DataListenerThread() {
             super("Hyracks Data Listener Thread");
@@ -136,7 +135,6 @@
             pendingIncomingConnections = new ArrayList<SocketChannel>();
             pendingNegotiations = new HashSet<SocketChannel>();
             pendingOutgoingConnections = new ArrayList<INetworkChannel>();
-            pendingAbortConnections = new ArrayList<INetworkChannel>();
         }
 
         synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
@@ -149,11 +147,6 @@
             selector.wakeup();
         }
 
-        synchronized void addPendingAbortConnections(List<INetworkChannel> abortConnections) {
-            pendingAbortConnections.addAll(abortConnections);
-            selector.wakeup();
-        }
-
         @Override
         public void run() {
             while (!stopped) {
@@ -184,15 +177,6 @@
                             }
                             pendingOutgoingConnections.clear();
                         }
-                        if (!pendingAbortConnections.isEmpty()) {
-                            for (INetworkChannel nc : pendingAbortConnections) {
-                                SelectionKey key = nc.getSelectionKey();
-                                nc.abort();
-                                nc.dispatchNetworkEvent();
-                                key.cancel();
-                            }
-                            pendingAbortConnections.clear();
-                        }
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("Selector: " + n);
                         }
@@ -248,7 +232,7 @@
 
         private PartitionId readInitialMessage(ByteBuffer buffer) {
             UUID jobId = readUUID(buffer);
-            ConnectorDescriptorId cdid = new ConnectorDescriptorId(readUUID(buffer));
+            ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
             int senderIndex = buffer.getInt();
             int receiverIndex = buffer.getInt();
             return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index dcf530e..28ac22f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -189,7 +189,7 @@
     private void prepareForWrite() {
         writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
         writeUUID(writeBuffer, partitionId.getJobId());
-        writeUUID(writeBuffer, partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
         writeBuffer.putInt(partitionId.getSenderIndex());
         writeBuffer.putInt(partitionId.getReceiverIndex());
         writeBuffer.flip();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 76cb1f0..f3535f8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.base;
 
-import java.util.UUID;
-
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -31,7 +29,7 @@
     protected final ConnectorDescriptorId id;
 
     public AbstractConnectorDescriptor(JobSpecification spec) {
-        this.id = new ConnectorDescriptorId(UUID.randomUUID());
+        this.id = spec.createConnectorDescriptor();
         spec.getConnectorMap().put(id, this);
     }
 
@@ -44,7 +42,7 @@
         JSONObject jconn = new JSONObject();
 
         jconn.put("type", "connector");
-        jconn.put("id", getConnectorId().getId().toString());
+        jconn.put("id", getConnectorId().getId());
         jconn.put("java-class", getClass().getName());
 
         return jconn;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 97ca255..15fc75d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.base;
 
-import java.util.UUID;
-
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -41,7 +39,7 @@
     protected final int outputArity;
 
     public AbstractOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
-        odId = new OperatorDescriptorId(UUID.randomUUID());
+        odId = spec.createOperatorDescriptorId();
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         recordDescriptors = new RecordDescriptor[outputArity];
@@ -78,7 +76,7 @@
     public JSONObject toJSON() throws JSONException {
         JSONObject jop = new JSONObject();
         jop.put("type", "operator");
-        jop.put("id", getOperatorId().getId().toString());
+        jop.put("id", getOperatorId().getId());
         jop.put("java-class", getClass().getName());
         jop.put("in-arity", getInputArity());
         jop.put("out-arity", getOutputArity());
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 7646a64..83ae7d5 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -32,8 +32,7 @@
             IHyracksRootContext rootCtx = new TestRootContext(frameSize);
             INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx, null);
             IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, UUID.randomUUID());
-            TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(
-                    new OperatorDescriptorId(UUID.randomUUID()), 0), 0), 0);
+            TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
             IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
             return taskCtx;
         } catch (HyracksException e) {