Changed job id from UUID to JobId
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@515 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 779e9ac..1377e37 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -16,7 +16,6 @@
import java.io.File;
import java.util.EnumSet;
-import java.util.UUID;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -26,6 +25,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
@@ -65,27 +65,27 @@
}
@Override
- public UUID createJob(String appName, JobSpecification jobSpec) throws Exception {
+ public JobId createJob(String appName, JobSpecification jobSpec) throws Exception {
return createJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
}
@Override
- public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public JobId createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
return hci.createJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
}
@Override
- public JobStatus getJobStatus(UUID jobId) throws Exception {
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
return hci.getJobStatus(jobId);
}
@Override
- public void start(UUID jobId) throws Exception {
+ public void start(JobId jobId) throws Exception {
hci.start(jobId);
}
@Override
- public void waitForCompletion(UUID jobId) throws Exception {
+ public void waitForCompletion(JobId jobId) throws Exception {
hci.waitForCompletion(jobId);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index a164fee..659a181 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -16,9 +16,9 @@
import java.io.File;
import java.util.EnumSet;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -27,13 +27,13 @@
public void destroyApplication(String appName) throws Exception;
- public UUID createJob(String appName, JobSpecification jobSpec) throws Exception;
+ public JobId createJob(String appName, JobSpecification jobSpec) throws Exception;
- public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ public JobId createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
- public JobStatus getJobStatus(UUID jobId) throws Exception;
+ public JobStatus getJobStatus(JobId jobId) throws Exception;
- public void start(UUID jobId) throws Exception;
+ public void start(JobId jobId) throws Exception;
- public void waitForCompletion(UUID jobId) throws Exception;
+ public void waitForCompletion(JobId jobId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index c4bf0e6..f57ac97 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -16,9 +16,9 @@
import java.rmi.Remote;
import java.util.EnumSet;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
public interface IHyracksClientInterface extends Remote {
@@ -30,11 +30,11 @@
public void destroyApplication(String appName) throws Exception;
- public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
- public JobStatus getJobStatus(UUID jobId) throws Exception;
+ public JobStatus getJobStatus(JobId jobId) throws Exception;
- public void start(UUID jobId) throws Exception;
+ public void start(JobId jobId) throws Exception;
- public void waitForCompletion(UUID jobId) throws Exception;
+ public void waitForCompletion(JobId jobId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
index e7fde8c..89e6da3 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
@@ -15,14 +15,14 @@
package edu.uci.ics.hyracks.api.comm;
import java.util.Collection;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public interface IPartitionCollector {
- public UUID getJobId();
+ public JobId getJobId();
public ConnectorDescriptorId getConnectorId();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index 3901007..e3c71ea 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -14,17 +14,16 @@
*/
package edu.uci.ics.hyracks.api.context;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
public interface IHyracksJobletContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry {
public INCApplicationContext getApplicationContext();
- public UUID getJobId();
+ public JobId getJobId();
public ICounterContext getCounterContext();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 5a58b7b..c6ca51c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -52,6 +52,6 @@
}
public String toString() {
- return "ANID:[" + odId + ":" + id + "]";
+ return "ANID:[" + odId + "]:" + id;
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
index 008fdb1..c3a0f7e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -14,14 +14,12 @@
*/
package edu.uci.ics.hyracks.api.job;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
public interface IJobLifecycleListener {
- public void notifyJobCreation(UUID jobId, JobSpecification jobSpec) throws HyracksException;
+ public void notifyJobCreation(JobId jobId, JobSpecification jobSpec) throws HyracksException;
- public void notifyJobStart(UUID jobId) throws HyracksException;
+ public void notifyJobStart(JobId jobId) throws HyracksException;
- public void notifyJobFinish(UUID jobId) throws HyracksException;
+ public void notifyJobFinish(JobId jobId) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
new file mode 100644
index 0000000..ffb9bba
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+public final class JobId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long id;
+
+ public JobId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof JobId)) {
+ return false;
+ }
+ return ((JobId) o).id == id;
+ }
+
+ @Override
+ public String toString() {
+ return "JID:" + id;
+ }
+
+ public static JobId parse(String str) {
+ if (str.startsWith("JID:")) {
+ str = str.substring(4);
+ return new JobId(Long.parseLong(str));
+ }
+ throw new IllegalArgumentException();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
index 3eafbd6..be25d12 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -15,14 +15,14 @@
package edu.uci.ics.hyracks.api.partitions;
import java.io.Serializable;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobId;
public final class PartitionId implements Serializable {
private static final long serialVersionUID = 1L;
- private final UUID jobId;
+ private final JobId jobId;
private final ConnectorDescriptorId cdId;
@@ -30,14 +30,14 @@
private final int receiverIndex;
- public PartitionId(UUID jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+ public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
this.jobId = jobId;
this.cdId = cdId;
this.senderIndex = senderIndex;
this.receiverIndex = receiverIndex;
}
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
index 6ed9847..77694b4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/CCClientInterface.java
@@ -3,11 +3,11 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.EnumSet;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
public class CCClientInterface extends UnicastRemoteObject implements IHyracksClientInterface {
@@ -40,22 +40,22 @@
}
@Override
- public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
return ccs.createJob(appName, jobSpec, jobFlags);
}
@Override
- public JobStatus getJobStatus(UUID jobId) throws Exception {
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
return ccs.getJobStatus(jobId);
}
@Override
- public void start(UUID jobId) throws Exception {
+ public void start(JobId jobId) throws Exception {
ccs.start(jobId);
}
@Override
- public void waitForCompletion(UUID jobId) throws Exception {
+ public void waitForCompletion(JobId jobId) throws Exception {
ccs.waitForCompletion(jobId);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 583d791..d864c05 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -26,7 +26,6 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level;
@@ -39,6 +38,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
@@ -94,7 +94,7 @@
private ClusterControllerInfo info;
- private final Map<UUID, JobRun> runMap;
+ private final Map<JobId, JobRun> runMap;
private final JobQueue jobQueue;
@@ -106,6 +106,8 @@
private final ICCContext ccContext;
+ private long jobCounter;
+
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
@@ -115,7 +117,7 @@
ClusterControllerService.class.getName()));
taskExecutor = Executors.newCachedThreadPool();
webServer = new WebServer(this);
- runMap = new HashMap<UUID, JobRun>();
+ runMap = new HashMap<JobId, JobRun>();
jobQueue = new JobQueue();
this.timer = new Timer(true);
ccci = new CCClientInterface(this);
@@ -125,6 +127,7 @@
return ipAddressNodeNameMap;
}
};
+ jobCounter = 0;
}
@Override
@@ -152,7 +155,7 @@
return applications;
}
- public Map<UUID, JobRun> getRunMap() {
+ public Map<JobId, JobRun> getRunMap() {
return runMap;
}
@@ -176,9 +179,13 @@
return ccConfig;
}
+ private JobId createJobId() {
+ return new JobId(jobCounter++);
+ }
+
@Override
- public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- UUID jobId = UUID.randomUUID();
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ JobId jobId = createJobId();
JobCreateEvent jce = new JobCreateEvent(this, jobId, appName, jobSpec, jobFlags);
jobQueue.schedule(jce);
jce.sync();
@@ -210,34 +217,34 @@
}
@Override
- public void notifyTaskComplete(UUID jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
TaskCompleteEvent sce = new TaskCompleteEvent(this, jobId, taskId, nodeId);
jobQueue.schedule(sce);
}
@Override
- public void notifyTaskFailure(UUID jobId, TaskAttemptId taskId, String nodeId, Exception exception)
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception exception)
throws Exception {
TaskFailureEvent tfe = new TaskFailureEvent(this, jobId, taskId, nodeId, exception);
jobQueue.schedule(tfe);
}
@Override
- public JobStatus getJobStatus(UUID jobId) throws Exception {
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
GetJobStatusEvent gse = new GetJobStatusEvent(this, jobId);
jobQueue.scheduleAndSync(gse);
return gse.getStatus();
}
@Override
- public void start(UUID jobId) throws Exception {
+ public void start(JobId jobId) throws Exception {
JobStartEvent jse = new JobStartEvent(this, jobId);
jobQueue.schedule(jse);
}
@Override
- public void waitForCompletion(UUID jobId) throws Exception {
+ public void waitForCompletion(JobId jobId) throws Exception {
GetJobStatusConditionVariableEvent e = new GetJobStatusConditionVariableEvent(this, jobId);
jobQueue.scheduleAndSync(e);
IJobStatusConditionVariable var = e.getConditionVariable();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e0cee6d..2b059b2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -16,9 +16,9 @@
import java.util.HashSet;
import java.util.Set;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
@@ -29,7 +29,7 @@
private final NetworkAddress dataPort;
- private final Set<UUID> activeJobIds;
+ private final Set<JobId> activeJobIds;
private int lastHeartbeatDuration;
@@ -37,7 +37,7 @@
this.nodeController = nodeController;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
- activeJobIds = new HashSet<UUID>();
+ activeJobIds = new HashSet<JobId>();
}
public void notifyHeartbeat() {
@@ -60,7 +60,7 @@
return ncConfig;
}
- public Set<UUID> getActiveJobIds() {
+ public Set<JobId> getActiveJobIds() {
return activeJobIds;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index e375aad..c1c0161 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -18,7 +18,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
@@ -26,6 +25,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.job.DeserializingJobSpecificationFactory;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
@@ -60,7 +60,7 @@
this.jobSpecFactory = jobSpecFactory;
}
- public JobSpecification createJobSpecification(UUID jobId, byte[] bytes) throws HyracksException {
+ public JobSpecification createJobSpecification(byte[] bytes) throws HyracksException {
return jobSpecFactory.createJobSpecification(bytes, (ICCBootstrap) bootstrap, this);
}
@@ -81,19 +81,19 @@
jobLifecycleListeners.add(jobLifecycleListener);
}
- public synchronized void notifyJobStart(UUID jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobStart(jobId);
}
}
- public synchronized void notifyJobFinish(UUID jobId) throws HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobFinish(jobId);
}
}
- public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification specification) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobCreation(jobId, specification);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 7b02baf..3bea59a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -18,20 +18,20 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
public class JobRun implements IJobStatusConditionVariable {
- private final UUID jobId;
+ private final JobId jobId;
private final JobActivityGraph jag;
@@ -53,7 +53,7 @@
private Exception exception;
- public JobRun(UUID jobId, JobActivityGraph plan) {
+ public JobRun(JobId jobId, JobActivityGraph plan) {
this.jobId = jobId;
this.jag = plan;
pmm = new PartitionMatchMaker();
@@ -63,7 +63,7 @@
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
}
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index c90b8ff..6975d4e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -16,11 +16,11 @@
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -32,11 +32,11 @@
public abstract class AbstractTaskLifecycleEvent extends AbstractEvent {
protected final ClusterControllerService ccs;
- protected final UUID jobId;
+ protected final JobId jobId;
protected final TaskAttemptId taId;
protected final String nodeId;
- public AbstractTaskLifecycleEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
+ public AbstractTaskLifecycleEvent(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
this.ccs = ccs;
this.jobId = jobId;
this.taId = taId;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index 387d3bd..738d86e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -14,20 +14,19 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
public class GetJobProfileJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
- private final UUID jobId;
+ private final JobId jobId;
private JSONObject profile;
- public GetJobProfileJSONEvent(ClusterControllerService ccs, UUID jobId) {
+ public GetJobProfileJSONEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
index 195b8c1..884ddf2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
@@ -14,20 +14,19 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
public class GetJobSpecificationJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
- private final UUID jobId;
+ private final JobId jobId;
private JSONObject spec;
- public GetJobSpecificationJSONEvent(ClusterControllerService ccs, UUID jobId) {
+ public GetJobSpecificationJSONEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
index 7768587..357d62b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
@@ -14,18 +14,17 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
public class GetJobStatusConditionVariableEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
- private final UUID jobId;
+ private final JobId jobId;
private IJobStatusConditionVariable cVar;
- public GetJobStatusConditionVariableEvent(ClusterControllerService ccs, UUID jobId) {
+ public GetJobStatusConditionVariableEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
index 589ac34..fbe6e4c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
@@ -14,8 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -23,10 +22,10 @@
public class GetJobStatusEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
- private final UUID jobId;
+ private final JobId jobId;
private JobStatus status;
- public GetJobStatusEvent(ClusterControllerService ccs, UUID jobId) {
+ public GetJobStatusEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index d1829e9..4274914 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -15,9 +15,9 @@
package edu.uci.ics.hyracks.control.cc.job.manager.events;
import java.util.Set;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -29,11 +29,11 @@
public class JobCleanupEvent extends AbstractEvent {
private ClusterControllerService ccs;
- private UUID jobId;
+ private JobId jobId;
private JobStatus status;
private Exception exception;
- public JobCleanupEvent(ClusterControllerService ccs, UUID jobId, JobStatus status, Exception exception) {
+ public JobCleanupEvent(ClusterControllerService ccs, JobId jobId, JobStatus status, Exception exception) {
this.ccs = ccs;
this.jobId = jobId;
this.status = status;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 3b49db3..5fadb00 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -15,12 +15,12 @@
package edu.uci.ics.hyracks.control.cc.job.manager.events;
import java.util.EnumSet;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -36,10 +36,10 @@
private final ClusterControllerService ccs;
private final byte[] jobSpec;
private final EnumSet<JobFlag> jobFlags;
- private final UUID jobId;
+ private final JobId jobId;
private final String appName;
- public JobCreateEvent(ClusterControllerService ccs, UUID jobId, String appName, byte[] jobSpec,
+ public JobCreateEvent(ClusterControllerService ccs, JobId jobId, String appName, byte[] jobSpec,
EnumSet<JobFlag> jobFlags) {
this.jobId = jobId;
this.ccs = ccs;
@@ -54,7 +54,7 @@
if (appCtx == null) {
throw new HyracksException("No application with id " + appName + " found");
}
- JobSpecification spec = appCtx.createJobSpecification(jobId, jobSpec);
+ JobSpecification spec = appCtx.createJobSpecification(jobSpec);
final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
builder.init(appName, spec, jobFlags);
@@ -76,7 +76,7 @@
appCtx.notifyJobCreation(jobId, spec);
}
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
index 64515a1..8f6357c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
@@ -14,8 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -23,9 +22,9 @@
public class JobStartEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
- private final UUID jobId;
+ private final JobId jobId;
- public JobStartEvent(ClusterControllerService ccs, UUID jobId) {
+ public JobStartEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index a4b9b50..b812594 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -17,11 +17,10 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -47,7 +46,7 @@
LOGGER.info(e.getKey() + " considered dead");
}
}
- Set<UUID> affectedJobIds = new HashSet<UUID>();
+ Set<JobId> affectedJobIds = new HashSet<JobId>();
Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
@@ -66,7 +65,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Number of affected jobs: " + affectedJobIds.size());
}
- for (UUID jobId : affectedJobIds) {
+ for (JobId jobId : affectedJobIds) {
JobRun run = ccs.getRunMap().get(jobId);
if (run != null) {
run.getScheduler().notifyNodeFailures(deadNodes);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
index a103bdd..bd82f75 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
@@ -16,9 +16,9 @@
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.logging.Level;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
@@ -35,7 +35,7 @@
@Override
public void run() {
- Map<UUID, JobRun> runMap = ccs.getRunMap();
+ Map<JobId, JobRun> runMap = ccs.getRunMap();
for (JobProfile profile : profiles) {
JobRun run = runMap.get(profile.getJobId());
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index 895135f..19fd95b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -14,16 +14,15 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
public class TaskCompleteEvent extends AbstractTaskLifecycleEvent {
- public TaskCompleteEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
+ public TaskCompleteEvent(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
super(ccs, jobId, taId, nodeId);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 1aa19b6..8b7e821 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -14,9 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
@@ -24,7 +23,7 @@
public class TaskFailureEvent extends AbstractTaskLifecycleEvent {
private final Exception exception;
- public TaskFailureEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId,
+ public TaskFailureEvent(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
Exception exception) {
super(ccs, jobId, taId, nodeId);
this.exception = exception;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 88a94ec..414cf12 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -14,16 +14,15 @@
*/
package edu.uci.ics.hyracks.control.cc.remote.ops;
-import java.util.UUID;
-
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.common.base.INodeController;
public class JobCompleteNotifier implements RemoteOp<Void> {
private String nodeId;
- private UUID jobId;
+ private JobId jobId;
- public JobCompleteNotifier(String nodeId, UUID jobId) {
+ public JobCompleteNotifier(String nodeId, JobId jobId) {
this.nodeId = nodeId;
this.jobId = jobId;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index dfb0e14..a8d8ab7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -23,7 +23,6 @@
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,6 +41,7 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -398,7 +398,7 @@
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
Executor executor = ccs.getExecutor();
- final UUID jobId = jobRun.getJobId();
+ final JobId jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
@@ -449,7 +449,7 @@
abortTaskAttempts.add(taId);
}
}
- final UUID jobId = jobRun.getJobId();
+ final JobId jobId = jobRun.getJobId();
for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
final List<TaskAttemptId> abortTaskAttempts = e.getValue();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index b62b391..0d4dc77 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -14,10 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.web;
-import java.util.UUID;
-
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobProfileJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSpecificationJSONEvent;
@@ -47,7 +46,7 @@
}
case 2: {
- UUID jobId = UUID.fromString(arguments[0]);
+ JobId jobId = JobId.parse(arguments[0]);
if ("spec".equalsIgnoreCase(arguments[1])) {
GetJobSpecificationJSONEvent gjse = new GetJobSpecificationJSONEvent(ccs, jobId);
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 8dae67d..6ff4901 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,9 +16,9 @@
import java.rmi.Remote;
import java.util.List;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -31,10 +31,10 @@
public void unregisterNode(INodeController nodeController) throws Exception;
- public void notifyTaskComplete(UUID jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
- public void notifyTaskFailure(UUID jobId, TaskAttemptId taskId, String nodeId, Exception e) throws Exception;
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception e) throws Exception;
public void nodeHeartbeat(String id) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 3979656..33548d9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -17,12 +17,12 @@
import java.rmi.Remote;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeCapability;
@@ -35,12 +35,12 @@
public NodeCapability getNodeCapability() throws Exception;
- public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, byte[] ctxVarBytes) throws Exception;
- public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
+ public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
- public void cleanUpJob(UUID jobId) throws Exception;
+ public void cleanUpJob(JobId jobId) throws Exception;
public void notifyRegistration(IClusterController ccs) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
index cc0b1a2..5457ac5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -2,24 +2,25 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.job.JobId;
+
public class JobProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
- private final UUID jobId;
+ private final JobId jobId;
private final Map<String, JobletProfile> jobletProfiles;
- public JobProfile(UUID jobId) {
+ public JobProfile(JobId jobId) {
this.jobId = jobId;
jobletProfiles = new HashMap<String, JobletProfile>();
}
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index c932889..3026359 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -19,7 +19,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
@@ -34,6 +33,7 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.naming.MultipartName;
@@ -55,7 +55,7 @@
private final INCApplicationContext appCtx;
- private final UUID jobId;
+ private final JobId jobId;
private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
@@ -71,7 +71,7 @@
private final IWorkspaceFileFactory fileFactory;
- public Joblet(NodeControllerService nodeController, UUID jobId, INCApplicationContext appCtx) {
+ public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
this.jobId = jobId;
@@ -85,7 +85,7 @@
}
@Override
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index e97eede..4eb1a03 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -31,7 +31,6 @@
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level;
@@ -67,6 +66,7 @@
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.naming.MultipartName;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -113,7 +113,7 @@
private IClusterController ccs;
- private final Map<UUID, Joblet> jobletMap;
+ private final Map<JobId, Joblet> jobletMap;
private final Executor executor;
@@ -136,7 +136,7 @@
partitionManager = new PartitionManager(this);
connectionManager.setPartitionRequestListener(partitionManager);
- jobletMap = new Hashtable<UUID, Joblet>();
+ jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
@@ -231,7 +231,7 @@
}
@Override
- public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
+ public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
try {
@@ -263,8 +263,10 @@
final int partition = tid.getPartition();
Map<MultipartName, Object> inputGlobalVariables = createInputGlobalVariables(ctxVarMap, han);
Task task = new Task(joblet, taId, han.getClass().getName(), executor);
- IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition());
- IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition, td.getPartitionCount());
+ IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(),
+ tid.getPartition());
+ IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition,
+ td.getPartitionCount());
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
@@ -314,9 +316,9 @@
private Map<MultipartName, Object> createInputGlobalVariables(Map<MultipartName, Object> ctxVarMap, IActivity han) {
Map<MultipartName, Object> gVars = new HashMap<MultipartName, Object>();
-// for (MultipartName inVar : han.getInputVariables()) {
-// gVars.put(inVar, ctxVarMap.get(inVar));
-// }
+ // for (MultipartName inVar : han.getInputVariables()) {
+ // gVars.put(inVar, ctxVarMap.get(inVar));
+ // }
return gVars;
}
@@ -333,7 +335,7 @@
}
}
- private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
+ private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
if (cPolicy.materializeOnSendSide()) {
return new IPartitionWriterFactory() {
@@ -354,7 +356,7 @@
}
}
- private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, INCApplicationContext appCtx) throws Exception {
+ private synchronized Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
ji = new Joblet(this, jobId, appCtx);
@@ -368,7 +370,7 @@
}
@Override
- public void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(JobId jobId) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
@@ -379,7 +381,7 @@
}
}
- public void notifyTaskComplete(UUID jobId, TaskAttemptId taskId, TaskProfile taskProfile) throws Exception {
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, TaskProfile taskProfile) throws Exception {
try {
ccs.notifyTaskComplete(jobId, taskId, id, taskProfile);
} catch (Exception e) {
@@ -388,7 +390,7 @@
}
}
- public void notifyTaskFailed(UUID jobId, TaskAttemptId taskId, Exception exception) {
+ public void notifyTaskFailed(JobId jobId, TaskAttemptId taskId, Exception exception) {
try {
ccs.notifyTaskFailure(jobId, taskId, id, exception);
} catch (Exception e) {
@@ -461,7 +463,7 @@
}
@Override
- public synchronized void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception {
+ public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index 15512ae..b9eaf1f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -29,7 +29,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -37,13 +36,14 @@
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
public class ConnectionManager {
private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
- static final int INITIAL_MESSAGE_SIZE = 28;
+ static final int INITIAL_MESSAGE_SIZE = 20;
private final IHyracksRootContext ctx;
@@ -231,18 +231,12 @@
}
private PartitionId readInitialMessage(ByteBuffer buffer) {
- UUID jobId = readUUID(buffer);
+ JobId jobId = new JobId(buffer.getLong());
ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
int senderIndex = buffer.getInt();
int receiverIndex = buffer.getInt();
return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
}
-
- private UUID readUUID(ByteBuffer buffer) {
- long msb = buffer.getLong();
- long lsb = buffer.getLong();
- return new UUID(msb, lsb);
- }
}
public NetworkAddress getNetworkAddress() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 28ac22f..58f7088 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -21,7 +21,6 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
-import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -188,7 +187,7 @@
private void prepareForWrite() {
writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
- writeUUID(writeBuffer, partitionId.getJobId());
+ writeBuffer.putLong(partitionId.getJobId().getId());
writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
writeBuffer.putInt(partitionId.getSenderIndex());
writeBuffer.putInt(partitionId.getReceiverIndex());
@@ -197,11 +196,6 @@
key.interestOps(SelectionKey.OP_WRITE);
}
- private void writeUUID(ByteBuffer buffer, UUID uuid) {
- buffer.putLong(uuid.getMostSignificantBits());
- buffer.putLong(uuid.getLeastSignificantBits());
- }
-
@Override
public void setSelectionKey(SelectionKey key) {
this.key = key;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 63ae790..baa50a1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,13 +19,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -79,7 +79,7 @@
return partitionMap.get(pid).get(0);
}
- public synchronized void unregisterPartitions(UUID jobId) {
+ public synchronized void unregisterPartitions(JobId jobId) {
for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<PartitionId, List<IPartition>> e = i.next();
PartitionId pid = e.getKey();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index 6b7eb4e..c108bef 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -17,7 +17,6 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
-import java.util.UUID;
import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
@@ -29,6 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class ReceiveSideMaterializingCollector implements IPartitionCollector {
@@ -52,7 +52,7 @@
}
@Override
- public UUID getJobId() {
+ public JobId getJobId() {
return delegate.getJobId();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
index fbc6fc4..b2dad8f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
@@ -14,11 +14,10 @@
*/
package edu.uci.ics.hyracks.dataflow.std.collectors;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobId;
public abstract class AbstractPartitionCollector implements IPartitionCollector {
protected final IHyracksTaskContext ctx;
@@ -34,7 +33,7 @@
}
@Override
- public UUID getJobId() {
+ public JobId getJobId() {
return ctx.getJobletContext().getJobId();
}
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 80f0d97..957f96d 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -15,8 +15,6 @@
package edu.uci.ics.hyracks.examples.btree.client;
-import java.util.UUID;
-
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -30,6 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -90,7 +89,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index bf3e04c..00538f8 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.examples.btree.client;
-import java.util.UUID;
-
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -29,6 +27,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
@@ -87,7 +86,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
index a7ee0dc..e063a27 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexEnlistFilesExample.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.examples.btree.client;
-import java.util.UUID;
-
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -26,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -72,7 +71,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index b43efd2..00a3766 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.examples.btree.client;
import java.io.DataOutput;
-import java.util.UUID;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -28,6 +27,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
@@ -79,7 +79,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 4940c2a..6fc6d10 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.examples.btree.client;
-import java.util.UUID;
-
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -26,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -83,7 +82,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index a770f09..d78951f 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.examples.btree.client;
import java.io.DataOutput;
-import java.util.UUID;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -28,6 +27,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -82,7 +82,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 40b40db..237ab52 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.tests.integration;
import java.util.EnumSet;
-import java.util.UUID;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -23,6 +22,7 @@
import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
@@ -74,7 +74,7 @@
}
protected void runTest(JobSpecification spec) throws Exception {
- UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
System.err.println(spec.toJSON().toString(2));
hcc.start(jobId);
System.err.print(jobId);
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 235d966..f591949 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.examples.text.client;
import java.io.File;
-import java.util.UUID;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -30,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
@@ -116,7 +116,7 @@
System.out.print(i + "\t" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
System.out.println("\t" + (System.currentTimeMillis() - start));
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index c13fcad..51d3661 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.examples.text.client;
import java.io.File;
-import java.util.UUID;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -30,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
@@ -95,7 +95,7 @@
options.algo, options.htSize, options.sbSize, options.format);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
+ JobId jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index d476a9d..09d0e5a 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -16,7 +16,6 @@
import java.io.File;
import java.util.EnumSet;
-import java.util.UUID;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -32,6 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
@@ -93,7 +93,7 @@
options.numJoinPartitions);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job,
+ JobId jobId = hcc.createJob(options.app, job,
options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.start(jobId);
hcc.waitForCompletion(jobId);
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index a363221..9e76a3e 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -4,10 +4,10 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
@@ -15,77 +15,73 @@
public class HyracksClient {
- private static HyracksRMIConnection connection;
- private static final String jobProfilingKey = "jobProfilingKey";
- Set<String> systemLibs;
+ private static HyracksRMIConnection connection;
+ private static final String jobProfilingKey = "jobProfilingKey";
+ Set<String> systemLibs;
- public HyracksClient(Properties clusterProperties) throws Exception {
- initialize(clusterProperties);
- }
+ public HyracksClient(Properties clusterProperties) throws Exception {
+ initialize(clusterProperties);
+ }
- private void initialize(Properties properties) throws Exception {
- String clusterController = (String) properties
- .get(ConfigurationConstants.clusterControllerHost);
- connection = new HyracksRMIConnection(clusterController, 1099);
- systemLibs = new HashSet<String>();
- for (String systemLib : ConfigurationConstants.systemLibs) {
- String systemLibPath = properties.getProperty(systemLib);
- if (systemLibPath != null) {
- systemLibs.add(systemLibPath);
- }
- }
- }
+ private void initialize(Properties properties) throws Exception {
+ String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+ connection = new HyracksRMIConnection(clusterController, 1099);
+ systemLibs = new HashSet<String>();
+ for (String systemLib : ConfigurationConstants.systemLibs) {
+ String systemLibPath = properties.getProperty(systemLib);
+ if (systemLibPath != null) {
+ systemLibs.add(systemLibPath);
+ }
+ }
+ }
- public HyracksClient(String clusterConf, char delimiter) throws Exception {
- Properties properties = Utilities.getProperties(clusterConf, delimiter);
- initialize(properties);
- }
+ public HyracksClient(String clusterConf, char delimiter) throws Exception {
+ Properties properties = Utilities.getProperties(clusterConf, delimiter);
+ initialize(properties);
+ }
- private Set<String> getRequiredLibs(Set<String> userLibs) {
- Set<String> requiredLibs = new HashSet<String>();
- for (String systemLib : systemLibs) {
- requiredLibs.add(systemLib);
- }
- for (String userLib : userLibs) {
- requiredLibs.add(userLib);
- }
- return requiredLibs;
- }
+ private Set<String> getRequiredLibs(Set<String> userLibs) {
+ Set<String> requiredLibs = new HashSet<String>();
+ for (String systemLib : systemLibs) {
+ requiredLibs.add(systemLib);
+ }
+ for (String userLib : userLibs) {
+ requiredLibs.add(userLib);
+ }
+ return requiredLibs;
+ }
- public JobStatus getJobStatus(UUID jobId) throws Exception {
- return connection.getJobStatus(jobId);
- }
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ return connection.getJobStatus(jobId);
+ }
- private void createApplication(String applicationName, Set<String> userLibs)
- throws Exception {
- connection.createApplication(applicationName, Utilities
- .getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
- }
+ private void createApplication(String applicationName, Set<String> userLibs) throws Exception {
+ connection.createApplication(applicationName,
+ Utilities.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+ }
- public HyracksRunningJob submitJob(String applicationName,
- JobSpecification spec) throws Exception {
- String jobProfilingVal = System.getenv(jobProfilingKey);
- boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
- UUID jobId;
- if (doProfiling) {
- System.out.println("PROFILING");
- jobId = connection.createJob(applicationName, spec, EnumSet
- .of(JobFlag.PROFILE_RUNTIME));
- } else {
- jobId = connection.createJob(applicationName, spec);
- }
- connection.start(jobId);
- HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
- return runningJob;
- }
+ public HyracksRunningJob submitJob(String applicationName, JobSpecification spec) throws Exception {
+ String jobProfilingVal = System.getenv(jobProfilingKey);
+ boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+ JobId jobId;
+ if (doProfiling) {
+ System.out.println("PROFILING");
+ jobId = connection.createJob(applicationName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ } else {
+ jobId = connection.createJob(applicationName, spec);
+ }
+ connection.start(jobId);
+ HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+ return runningJob;
+ }
- public HyracksRunningJob submitJob(String applicationName,
- JobSpecification spec, Set<String> userLibs) throws Exception {
- createApplication(applicationName, userLibs);
- return submitJob(applicationName, spec);
- }
+ public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> userLibs)
+ throws Exception {
+ createApplication(applicationName, userLibs);
+ return submitJob(applicationName, spec);
+ }
- public void waitForCompleton(UUID jobId) throws Exception {
- connection.waitForCompletion(jobId);
- }
+ public void waitForCompleton(JobId jobId) throws Exception {
+ connection.waitForCompletion(jobId);
+ }
}
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
index 8470e12..7c650ec 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.hadoop.compat.client;
import java.io.IOException;
-import java.util.UUID;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobID;
@@ -9,19 +8,20 @@
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class HyracksRunningJob implements RunningJob {
- UUID jobId;
+ JobId jobId;
JobSpecification spec;
HyracksClient hyracksClient;
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
- public void setJobId(UUID jobId) {
+ public void setJobId(JobId jobId) {
this.jobId = jobId;
}
@@ -33,7 +33,7 @@
this.spec = spec;
}
- public HyracksRunningJob(UUID jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
+ public HyracksRunningJob(JobId jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
this.spec = jobSpec;
this.jobId = jobId;
this.hyracksClient = hyracksClient;
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 37f4d34..bf09bb0 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -6,15 +6,16 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
-import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.kohsuke.args4j.CmdLineParser;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
@@ -22,187 +23,168 @@
import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
public class CompatibilityLayer {
- HyracksClient hyracksClient;
- DCacheHandler dCacheHander = null;
- Properties clusterConf;
- HadoopAdapter hadoopAdapter;
+ HyracksClient hyracksClient;
+ DCacheHandler dCacheHander = null;
+ Properties clusterConf;
+ HadoopAdapter hadoopAdapter;
- private static char configurationFileDelimiter = '=';
- private static final String dacheKeyPrefix = "dcache.key";
+ private static char configurationFileDelimiter = '=';
+ private static final String dacheKeyPrefix = "dcache.key";
- public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
- initialize(clConfig);
- }
+ public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+ initialize(clConfig);
+ }
- private void initialize(CompatibilityConfig clConfig) throws Exception {
- clusterConf = Utilities.getProperties(clConfig.clusterConf,
- configurationFileDelimiter);
- hadoopAdapter = new HadoopAdapter(clusterConf
- .getProperty(ConfigurationConstants.namenodeURL));
- hyracksClient = new HyracksClient(clusterConf);
- dCacheHander = new DCacheHandler(clusterConf
- .getProperty(ConfigurationConstants.dcacheServerConfiguration));
- }
+ private void initialize(CompatibilityConfig clConfig) throws Exception {
+ clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+ hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
+ hyracksClient = new HyracksClient(clusterConf);
+ dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+ }
- public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- jobConfs.add(conf);
- String applicationName = conf.getJobName() + System.currentTimeMillis();
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec, userLibs);
- return hyracksRunningJob;
- }
-
- public HyracksRunningJob submitJobs(String applicationName,
- String[] jobFiles, Set<String> userLibs) throws Exception {
- List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
- populateDCache(jobFiles[0]);
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec, userLibs);
- return hyracksRunningJob;
- }
+ public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ jobConfs.add(conf);
+ String applicationName = conf.getJobName() + System.currentTimeMillis();
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
- public HyracksRunningJob submitJobs(String applicationName,
- String[] jobFiles) throws Exception {
- List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
- populateDCache(jobFiles[0]);
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec);
- return hyracksRunningJob;
- }
+ public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs)
+ throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
- private void populateDCache(String jobFile) throws IOException {
- Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
- String tempDir = "/tmp";
- if (dcacheTasks.size() > 0) {
- for (String key : dcacheTasks.keySet()) {
- String destPath = tempDir + "/" + key
- + System.currentTimeMillis();
- hadoopAdapter.getHDFSClient().copyToLocalFile(
- new Path(dcacheTasks.get(key)), new Path(destPath));
- System.out.println(" source :" + dcacheTasks.get(key));
- System.out.println(" dest :" + destPath);
- System.out.println(" key :" + key);
- System.out.println(" value :" + destPath);
- dCacheHander.put(key, destPath);
- }
- }
- }
+ public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec);
+ return hyracksRunningJob;
+ }
- private String getApplicationNameForHadoopJob(JobConf jobConf) {
- String jar = jobConf.getJar();
- if (jar != null) {
- return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
- .lastIndexOf("/") + 1 : 0);
- } else {
- return "" + System.currentTimeMillis();
- }
- }
+ private void populateDCache(String jobFile) throws IOException {
+ Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+ String tempDir = "/tmp";
+ if (dcacheTasks.size() > 0) {
+ for (String key : dcacheTasks.keySet()) {
+ String destPath = tempDir + "/" + key + System.currentTimeMillis();
+ hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+ System.out.println(" source :" + dcacheTasks.get(key));
+ System.out.println(" dest :" + destPath);
+ System.out.println(" key :" + key);
+ System.out.println(" value :" + destPath);
+ dCacheHander.put(key, destPath);
+ }
+ }
+ }
- private Map<String, String> initializeCustomProperties(
- Properties properties, String prefix) {
- Map<String, String> foundProperties = new HashMap<String, String>();
- Set<Entry<Object, Object>> entrySet = properties.entrySet();
- for (Entry entry : entrySet) {
- String key = (String) entry.getKey();
- String value = (String) entry.getValue();
- if ((key.startsWith(prefix))) {
- String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
- foundProperties.put(actualKey, value);
- }
- }
- return foundProperties;
- }
+ private String getApplicationNameForHadoopJob(JobConf jobConf) {
+ String jar = jobConf.getJar();
+ if (jar != null) {
+ return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0);
+ } else {
+ return "" + System.currentTimeMillis();
+ }
+ }
- public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, ',');
- Map<String, String> dcacheTasks = new HashMap<String, String>();
- Map<String, String> dcacheKeys = initializeCustomProperties(
- jobProperties, dacheKeyPrefix);
- for (String key : dcacheKeys.keySet()) {
- String sourcePath = dcacheKeys.get(key);
- if (sourcePath != null) {
- dcacheTasks.put(key, sourcePath);
- }
- }
- return dcacheTasks;
- }
+ private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+ Map<String, String> foundProperties = new HashMap<String, String>();
+ Set<Entry<Object, Object>> entrySet = properties.entrySet();
+ for (Entry entry : entrySet) {
+ String key = (String) entry.getKey();
+ String value = (String) entry.getValue();
+ if ((key.startsWith(prefix))) {
+ String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+ foundProperties.put(actualKey, value);
+ }
+ }
+ return foundProperties;
+ }
- public void waitForCompletion(UUID jobId) throws Exception {
- hyracksClient.waitForCompleton(jobId);
- }
+ public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, ',');
+ Map<String, String> dcacheTasks = new HashMap<String, String>();
+ Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+ for (String key : dcacheKeys.keySet()) {
+ String sourcePath = dcacheKeys.get(key);
+ if (sourcePath != null) {
+ dcacheTasks.put(key, sourcePath);
+ }
+ }
+ return dcacheTasks;
+ }
- private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
- throws Exception {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- for (String jobFile : jobFiles) {
- jobConfs.add(constructHadoopJobConf(jobFile));
- }
- return jobConfs;
- }
+ public void waitForCompletion(JobId jobId) throws Exception {
+ hyracksClient.waitForCompleton(jobId);
+ }
- private JobConf constructHadoopJobConf(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, '=');
- JobConf conf = new JobConf(hadoopAdapter.getConf());
- for (Entry entry : jobProperties.entrySet()) {
- conf.set((String) entry.getKey(), (String) entry.getValue());
- System.out.println((String) entry.getKey() + " : "
- + (String) entry.getValue());
- }
- return conf;
- }
+ private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ for (String jobFile : jobFiles) {
+ jobConfs.add(constructHadoopJobConf(jobFile));
+ }
+ return jobConfs;
+ }
- private String[] getJobs(CompatibilityConfig clConfig) {
- return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
- .split(",");
- }
+ private JobConf constructHadoopJobConf(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, '=');
+ JobConf conf = new JobConf(hadoopAdapter.getConf());
+ for (Entry entry : jobProperties.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+ }
+ return conf;
+ }
- public static void main(String args[]) throws Exception {
- long startTime = System.nanoTime();
- CompatibilityConfig clConfig = new CompatibilityConfig();
- CmdLineParser cp = new CmdLineParser(clConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- cp.printUsage(System.err);
- return;
- }
- CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
- String applicationName = clConfig.applicationName;
- String[] jobFiles = compatLayer.getJobs(clConfig);
- String[] userLibraries = null;
- if (clConfig.userLibs != null) {
- userLibraries = clConfig.userLibs.split(",");
- }
- try {
- HyracksRunningJob hyraxRunningJob = null;
- if (userLibraries != null) {
- Set<String> userLibs = new HashSet<String>();
- for (String userLib : userLibraries) {
- userLibs.add(userLib);
- }
- hyraxRunningJob = compatLayer.submitJobs(applicationName,
- jobFiles, userLibs);
- } else {
- hyraxRunningJob = compatLayer.submitJobs(applicationName,
- jobFiles);
- }
- compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
- long end_time = System.nanoTime();
- System.out.println("TOTAL TIME (from Launch to Completion):"
- + ((end_time - startTime) / (float) 1000000000.0)
- + " seconds.");
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
+ private String[] getJobs(CompatibilityConfig clConfig) {
+ return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+ }
+
+ public static void main(String args[]) throws Exception {
+ long startTime = System.nanoTime();
+ CompatibilityConfig clConfig = new CompatibilityConfig();
+ CmdLineParser cp = new CmdLineParser(clConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+ CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+ String applicationName = clConfig.applicationName;
+ String[] jobFiles = compatLayer.getJobs(clConfig);
+ String[] userLibraries = null;
+ if (clConfig.userLibs != null) {
+ userLibraries = clConfig.userLibs.split(",");
+ }
+ try {
+ HyracksRunningJob hyraxRunningJob = null;
+ if (userLibraries != null) {
+ Set<String> userLibs = new HashSet<String>();
+ for (String userLib : userLibraries) {
+ userLibs.add(userLib);
+ }
+ hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs);
+ } else {
+ hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles);
+ }
+ compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+ long end_time = System.nanoTime();
+ System.out.println("TOTAL TIME (from Launch to Completion):"
+ + ((end_time - startTime) / (float) 1000000000.0) + " seconds.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 620e1a5..4f84466 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.test.support;
import java.nio.ByteBuffer;
-import java.util.UUID;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
@@ -23,6 +22,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -30,10 +30,10 @@
public class TestJobletContext implements IHyracksJobletContext {
private final INCApplicationContext appContext;
- private UUID jobId;
+ private JobId jobId;
private WorkspaceFileFactory fileFactory;
- public TestJobletContext(INCApplicationContext appContext, UUID jobId) throws HyracksException {
+ public TestJobletContext(INCApplicationContext appContext, JobId jobId) throws HyracksException {
this.appContext = appContext;
this.jobId = jobId;
fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
@@ -85,7 +85,7 @@
}
@Override
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
}
\ No newline at end of file
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 83ae7d5..5d488c1 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.test.support;
-import java.util.UUID;
-
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -25,13 +23,14 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
public class TestUtils {
public static IHyracksTaskContext create(int frameSize) {
try {
IHyracksRootContext rootCtx = new TestRootContext(frameSize);
INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx, null);
- IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, UUID.randomUUID());
+ IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, new JobId(0));
TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
return taskCtx;