Refactored JobProfile construction
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@662 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 50c2c32..9a9aee2 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -57,7 +57,7 @@
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -65,6 +65,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
import edu.uci.ics.hyracks.control.nc.work.CleanupJobWork;
import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
@@ -324,24 +325,10 @@
@Override
public void run() {
try {
- List<JobProfile> profiles;
- synchronized (NodeControllerService.this) {
- profiles = new ArrayList<JobProfile>();
- for (Joblet ji : jobletMap.values()) {
- profiles.add(new JobProfile(ji.getJobId()));
- }
- }
- for (JobProfile jProfile : profiles) {
- Joblet ji;
- JobletProfile jobletProfile = new JobletProfile(id);
- synchronized (NodeControllerService.this) {
- ji = jobletMap.get(jProfile.getJobId());
- }
- if (ji != null) {
- ji.dumpProfile(jobletProfile);
- jProfile.getJobletProfiles().put(id, jobletProfile);
- }
- }
+ FutureValue fv = new FutureValue();
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ queue.scheduleAndSync(bjpw);
+ List<JobProfile> profiles = (List<JobProfile>) fv.get();
if (!profiles.isEmpty()) {
cc.reportProfile(id, profiles);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
new file mode 100644
index 0000000..0aee348
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class BuildJobProfilesWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(BuildJobProfilesWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final FutureValue fv;
+
+ public BuildJobProfilesWork(NodeControllerService ncs, FutureValue fv) {
+ this.ncs = ncs;
+ this.fv = fv;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ List<JobProfile> profiles = new ArrayList<JobProfile>();
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ for (Joblet ji : jobletMap.values()) {
+ profiles.add(new JobProfile(ji.getJobId()));
+ }
+ for (JobProfile jProfile : profiles) {
+ Joblet ji;
+ JobletProfile jobletProfile = new JobletProfile(ncs.getId());
+ ji = jobletMap.get(jProfile.getJobId());
+ if (ji != null) {
+ ji.dumpProfile(jobletProfile);
+ jProfile.getJobletProfiles().put(ncs.getId(), jobletProfile);
+ }
+ }
+ fv.setValue(profiles);
+ }
+}
\ No newline at end of file