Converted OperatorDescriptorId and ConnectorDescriptorId to hold ints instead of UUIDs
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@514 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index 86d8118..3133a7f 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -15,24 +15,23 @@
package edu.uci.ics.hyracks.api.dataflow;
import java.io.Serializable;
-import java.util.UUID;
public final class ConnectorDescriptorId implements Serializable {
private static final long serialVersionUID = 1L;
- private UUID id;
+ private int id;
- public ConnectorDescriptorId(UUID id) {
+ public ConnectorDescriptorId(int id) {
this.id = id;
}
- public UUID getId() {
+ public int getId() {
return id;
}
@Override
public int hashCode() {
- return id.hashCode();
+ return id;
}
@Override
@@ -44,7 +43,7 @@
return false;
}
ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
- return id.equals(other.id);
+ return id == other.id;
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index bbf3cf5..b858736 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -15,24 +15,23 @@
package edu.uci.ics.hyracks.api.dataflow;
import java.io.Serializable;
-import java.util.UUID;
public final class OperatorDescriptorId implements Serializable {
private static final long serialVersionUID = 1L;
- private final UUID id;
+ private final int id;
- public OperatorDescriptorId(UUID id) {
+ public OperatorDescriptorId(int id) {
this.id = id;
}
- public UUID getId() {
+ public int getId() {
return id;
}
@Override
public int hashCode() {
- return id.hashCode();
+ return id;
}
@Override
@@ -43,7 +42,7 @@
if (!(o instanceof OperatorDescriptorId)) {
return false;
}
- return ((OperatorDescriptorId) o).id.equals(id);
+ return ((OperatorDescriptorId) o).id == id;
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 79d10ab..e3d28ea 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -58,6 +58,10 @@
private int maxAttempts;
+ private transient int operatorIdCounter;
+
+ private transient int connectorIdCounter;
+
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -67,6 +71,16 @@
connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
properties = new HashMap<String, Serializable>();
userConstraints = new HashSet<Constraint>();
+ operatorIdCounter = 0;
+ connectorIdCounter = 0;
+ }
+
+ public OperatorDescriptorId createOperatorDescriptorId() {
+ return new OperatorDescriptorId(operatorIdCounter++);
+ }
+
+ public ConnectorDescriptorId createConnectorDescriptor() {
+ return new ConnectorDescriptorId(connectorIdCounter++);
}
public void addRoot(IOperatorDescriptor op) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 91626f7..ae95ca8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -15,12 +15,9 @@
package edu.uci.ics.hyracks.control.cc.job;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
public class ActivityCluster {
private final JobRun jobRun;
@@ -33,16 +30,11 @@
private ActivityClusterPlan acp;
- private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
- private Set<TaskCluster> inProgressTaskClusters;
-
public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
this.jobRun = jobRun;
this.activities = activities;
dependencies = new HashSet<ActivityCluster>();
dependents = new HashSet<ActivityCluster>();
- inProgressTaskClusters = new HashSet<TaskCluster>();
}
public Set<ActivityId> getActivities() {
@@ -80,16 +72,4 @@
public void setPlan(ActivityClusterPlan acp) {
this.acp = acp;
}
-
- public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
- this.connectorPolicies = connectorPolicies;
- }
-
- public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
- return connectorPolicies;
- }
-
- public Set<TaskCluster> getInProgressTaskClusters() {
- return inProgressTaskClusters;
- }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index e4353f8..387d3bd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -19,23 +19,28 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
public class GetJobProfileJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
- private final int attempt;
private JSONObject profile;
- public GetJobProfileJSONEvent(ClusterControllerService ccs, UUID jobId, int attempt) {
+ public GetJobProfileJSONEvent(ClusterControllerService ccs, UUID jobId) {
this.ccs = ccs;
this.jobId = jobId;
- this.attempt = attempt;
}
@Override
protected void doRun() throws Exception {
profile = new JSONObject();
+ JobRun jobRun = ccs.getRunMap().get(jobId);
+ if (jobRun == null) {
+ profile = new JSONObject();
+ return;
+ }
+ profile = jobRun.getJobProfile().toJSON();
}
public JSONObject getProfile() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 5396e2c..1aa19b6 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -17,7 +17,6 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 28a28ad..dfb0e14 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -367,7 +367,7 @@
tcAttempt.initializePendingTaskCounter();
tcAttempts.add(tcAttempt);
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
- tc.getActivityCluster().getInProgressTaskClusters().add(tc);
+ inProgressTaskClusters.add(tc);
}
private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index 716dfcb..b62b391 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -53,19 +53,12 @@
GetJobSpecificationJSONEvent gjse = new GetJobSpecificationJSONEvent(ccs, jobId);
ccs.getJobQueue().scheduleAndSync(gjse);
result.put("result", gjse.getSpecification());
- }
- break;
- }
-
- case 3: {
- UUID jobId = UUID.fromString(arguments[0]);
- int attempt = Integer.parseInt(arguments[1]);
-
- if ("profile".equalsIgnoreCase(arguments[2])) {
- GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId, attempt);
+ } else if ("profile".equalsIgnoreCase(arguments[1])) {
+ GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
ccs.getJobQueue().scheduleAndSync(gjpe);
result.put("result", gjpe.getProfile());
}
+
break;
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index c3a4e20..15512ae 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -43,7 +43,7 @@
public class ConnectionManager {
private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
- static final int INITIAL_MESSAGE_SIZE = 40;
+ static final int INITIAL_MESSAGE_SIZE = 28;
private final IHyracksRootContext ctx;
@@ -123,7 +123,6 @@
private final List<SocketChannel> pendingIncomingConnections;
private final Set<SocketChannel> pendingNegotiations;
private final List<INetworkChannel> pendingOutgoingConnections;
- private final List<INetworkChannel> pendingAbortConnections;
public DataListenerThread() {
super("Hyracks Data Listener Thread");
@@ -136,7 +135,6 @@
pendingIncomingConnections = new ArrayList<SocketChannel>();
pendingNegotiations = new HashSet<SocketChannel>();
pendingOutgoingConnections = new ArrayList<INetworkChannel>();
- pendingAbortConnections = new ArrayList<INetworkChannel>();
}
synchronized void addIncomingConnection(SocketChannel sc) throws IOException {
@@ -149,11 +147,6 @@
selector.wakeup();
}
- synchronized void addPendingAbortConnections(List<INetworkChannel> abortConnections) {
- pendingAbortConnections.addAll(abortConnections);
- selector.wakeup();
- }
-
@Override
public void run() {
while (!stopped) {
@@ -184,15 +177,6 @@
}
pendingOutgoingConnections.clear();
}
- if (!pendingAbortConnections.isEmpty()) {
- for (INetworkChannel nc : pendingAbortConnections) {
- SelectionKey key = nc.getSelectionKey();
- nc.abort();
- nc.dispatchNetworkEvent();
- key.cancel();
- }
- pendingAbortConnections.clear();
- }
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Selector: " + n);
}
@@ -248,7 +232,7 @@
private PartitionId readInitialMessage(ByteBuffer buffer) {
UUID jobId = readUUID(buffer);
- ConnectorDescriptorId cdid = new ConnectorDescriptorId(readUUID(buffer));
+ ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
int senderIndex = buffer.getInt();
int receiverIndex = buffer.getInt();
return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index dcf530e..28ac22f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -189,7 +189,7 @@
private void prepareForWrite() {
writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
writeUUID(writeBuffer, partitionId.getJobId());
- writeUUID(writeBuffer, partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
writeBuffer.putInt(partitionId.getSenderIndex());
writeBuffer.putInt(partitionId.getReceiverIndex());
writeBuffer.flip();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 76cb1f0..f3535f8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.dataflow.std.base;
-import java.util.UUID;
-
import org.json.JSONException;
import org.json.JSONObject;
@@ -31,7 +29,7 @@
protected final ConnectorDescriptorId id;
public AbstractConnectorDescriptor(JobSpecification spec) {
- this.id = new ConnectorDescriptorId(UUID.randomUUID());
+ this.id = spec.createConnectorDescriptor();
spec.getConnectorMap().put(id, this);
}
@@ -44,7 +42,7 @@
JSONObject jconn = new JSONObject();
jconn.put("type", "connector");
- jconn.put("id", getConnectorId().getId().toString());
+ jconn.put("id", getConnectorId().getId());
jconn.put("java-class", getClass().getName());
return jconn;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 97ca255..15fc75d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.dataflow.std.base;
-import java.util.UUID;
-
import org.json.JSONException;
import org.json.JSONObject;
@@ -41,7 +39,7 @@
protected final int outputArity;
public AbstractOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
- odId = new OperatorDescriptorId(UUID.randomUUID());
+ odId = spec.createOperatorDescriptorId();
this.inputArity = inputArity;
this.outputArity = outputArity;
recordDescriptors = new RecordDescriptor[outputArity];
@@ -78,7 +76,7 @@
public JSONObject toJSON() throws JSONException {
JSONObject jop = new JSONObject();
jop.put("type", "operator");
- jop.put("id", getOperatorId().getId().toString());
+ jop.put("id", getOperatorId().getId());
jop.put("java-class", getClass().getName());
jop.put("in-arity", getInputArity());
jop.put("out-arity", getOutputArity());
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 7646a64..83ae7d5 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -32,8 +32,7 @@
IHyracksRootContext rootCtx = new TestRootContext(frameSize);
INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx, null);
IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, UUID.randomUUID());
- TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(
- new OperatorDescriptorId(UUID.randomUUID()), 0), 0), 0);
+ TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
return taskCtx;
} catch (HyracksException e) {